"""MCP 用户目标拆解与规划"""
import logging
from collections.abc import AsyncGenerator
from typing import Any
from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
from apps.llm.reasoning import ReasoningLLM
from apps.scheduler.mcp_agent.base import MCPBase
from apps.scheduler.mcp_agent.prompt import (
CHANGE_ERROR_MESSAGE_TO_DESCRIPTION,
CREATE_PLAN,
EVALUATE_GOAL,
FINAL_ANSWER,
GEN_STEP,
GENERATE_FLOW_NAME,
GENERATE_FLOW_EXCUTE_RISK,
GET_MISSING_PARAMS,
GET_REPLAN_START_STEP_INDEX,
IS_PARAM_ERROR,
RECREATE_PLAN,
RISK_EVALUATE,
TOOL_EXECUTE_ERROR_TYPE_ANALYSIS,
TOOL_SKIP,
)
from apps.schemas.enum_var import LanguageType
from apps.scheduler.slot.slot import Slot
from apps.schemas.mcp import (
GoalEvaluationResult,
FlowName,
FlowRisk,
IsParamError,
MCPPlan,
MCPTool,
RestartStepIndex,
Step,
ToolExcutionErrorType,
ToolRisk,
ToolSkip,
)
from apps.schemas.task import Task
_env = SandboxedEnvironment(
loader=BaseLoader,
autoescape=False,
trim_blocks=True,
lstrip_blocks=True,
)
logger = logging.getLogger(__name__)
class MCPPlanner(MCPBase):
"""MCP 用户目标拆解与规划"""
@staticmethod
async def evaluate_goal(
goal: str, tool_list: list[MCPTool],
resoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,) -> GoalEvaluationResult:
"""评估用户目标的可行性"""
result = await MCPPlanner._get_reasoning_evaluation(goal, tool_list, resoning_llm, language)
return await MCPPlanner._parse_evaluation_result(result)
@staticmethod
async def _get_reasoning_evaluation(
goal, tool_list: list[MCPTool],
resoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,) -> str:
"""获取推理大模型的评估结果"""
template = _env.from_string(EVALUATE_GOAL[language])
prompt = template.render(
goal=goal,
tools=tool_list,
)
return await MCPPlanner.get_resoning_result(prompt, resoning_llm)
@staticmethod
async def _parse_evaluation_result(result: str) -> GoalEvaluationResult:
"""将推理结果解析为结构化数据"""
schema = GoalEvaluationResult.model_json_schema()
evaluation = await MCPPlanner._parse_result(result, schema)
return GoalEvaluationResult.model_validate(evaluation)
@staticmethod
async def get_flow_name(
user_goal: str,
resoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,
) -> FlowName:
"""获取当前流程的名称"""
result = await MCPPlanner._get_reasoning_flow_name(user_goal, resoning_llm, language)
result = await MCPPlanner._parse_result(result, FlowName.model_json_schema())
return FlowName.model_validate(result)
@staticmethod
async def _get_reasoning_flow_name(
user_goal: str,
resoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,
) -> str:
"""获取推理大模型的流程名称"""
template = _env.from_string(GENERATE_FLOW_NAME[language])
prompt = template.render(goal=user_goal)
return await MCPPlanner.get_resoning_result(prompt, resoning_llm)
@staticmethod
async def get_flow_excute_risk(
user_goal: str,
tools: list[MCPTool],
resoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,
) -> FlowRisk:
"""获取当前流程的风险评估结果"""
result = await MCPPlanner._get_reasoning_flow_risk(user_goal, tools, resoning_llm, language)
result = await MCPPlanner._parse_result(result, FlowRisk.model_json_schema())
return FlowRisk.model_validate(result)
@staticmethod
async def _get_reasoning_flow_risk(
user_goal: str,
tools: list[MCPTool],
resoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,
) -> str:
"""获取推理大模型的流程风险"""
template = _env.from_string(GENERATE_FLOW_EXCUTE_RISK[language])
prompt = template.render(
goal=user_goal,
tools=tools,
)
return await MCPPlanner.get_resoning_result(prompt, resoning_llm)
@staticmethod
async def get_replan_start_step_index(
user_goal: str,
error_message: str,
current_plan: MCPPlan | None = None,
history: str = "",
reasoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,
) -> RestartStepIndex:
"""获取重新规划的步骤索引"""
template = _env.from_string(GET_REPLAN_START_STEP_INDEX[language])
prompt = template.render(
goal=user_goal,
error_message=error_message,
current_plan=current_plan.model_dump(
exclude_none=True, by_alias=True),
history=history,
)
result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
schema = RestartStepIndex.model_json_schema()
schema["properties"]["start_index"]["maximum"] = len(
current_plan.plans) - 1
schema["properties"]["start_index"]["minimum"] = 0
restart_index = await MCPPlanner._parse_result(result, schema)
return RestartStepIndex.model_validate(restart_index)
@staticmethod
async def create_plan(
user_goal: str,
is_replan: bool = False,
error_message: str = "",
current_plan: MCPPlan | None = None,
tool_list: list[MCPTool] = [],
max_steps: int = 6,
reasoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,
) -> MCPPlan:
"""规划下一步的执行流程,并输出"""
result = await MCPPlanner._get_reasoning_plan(
user_goal, is_replan, error_message, current_plan, tool_list, max_steps, reasoning_llm, language
)
return await MCPPlanner._parse_plan_result(result, max_steps)
@staticmethod
async def _get_reasoning_plan(
user_goal: str,
is_replan: bool = False,
error_message: str = "",
current_plan: MCPPlan | None = None,
tool_list: list[MCPTool] = [],
max_steps: int = 10,
reasoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,
) -> str:
"""获取推理大模型的结果"""
tool_ids = [tool.id for tool in tool_list]
if is_replan:
template = _env.from_string(RECREATE_PLAN[language])
prompt = template.render(
current_plan=current_plan.model_dump(
exclude_none=True, by_alias=True),
error_message=error_message,
goal=user_goal,
tools=tool_list,
max_num=max_steps,
)
else:
template = _env.from_string(CREATE_PLAN[language])
prompt = template.render(
goal=user_goal,
tools=tool_list,
max_num=max_steps,
)
return await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
@staticmethod
async def _parse_plan_result(result: str, max_steps: int) -> MCPPlan:
"""将推理结果解析为结构化数据"""
schema = MCPPlan.model_json_schema()
schema["properties"]["plans"]["maxItems"] = max_steps
plan = await MCPPlanner._parse_result(result, schema)
return MCPPlan.model_validate(plan)
@staticmethod
async def create_next_step(
goal: str,
history: str,
tools: list[MCPTool],
reasoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,
) -> Step:
"""创建下一步的执行步骤"""
template = _env.from_string(GEN_STEP[language])
prompt = template.render(goal=goal, history=history, tools=tools)
result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
schema = Step.model_json_schema()
if "enum" not in schema["properties"]["tool_id"]:
schema["properties"]["tool_id"]["enum"] = []
for tool in tools:
schema["properties"]["tool_id"]["enum"].append(tool.id)
step = await MCPPlanner._parse_result(result, schema)
logger.info("[MCPPlanner] 创建下一步的执行步骤: %s", step)
step = Step.model_validate(step)
return step
@staticmethod
async def tool_skip(
task: Task,
step_id: str,
step_name: str,
step_instruction: str,
step_content: str,
reasoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,
) -> ToolSkip:
"""判断当前步骤是否需要跳过"""
template = _env.from_string(TOOL_SKIP[language])
from apps.scheduler.mcp_agent.host import MCPHost
history = await MCPHost.assemble_memory(task)
prompt = template.render(
step_id=step_id,
step_name=step_name,
step_instruction=step_instruction,
step_content=step_content,
history=history,
goal=task.runtime.question
)
result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
schema = ToolSkip.model_json_schema()
skip_result = await MCPPlanner._parse_result(result, schema)
return ToolSkip.model_validate(skip_result)
@staticmethod
async def get_tool_risk(
tool: MCPTool,
input_parm: dict[str, Any],
additional_info: str = "",
resoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,
) -> ToolRisk:
"""获取MCP工具的风险评估结果"""
result = await MCPPlanner._get_reasoning_risk(
tool, input_parm, additional_info, resoning_llm, language
)
return await MCPPlanner._parse_risk_result(result)
@staticmethod
async def _get_reasoning_risk(
tool: MCPTool,
input_param: dict[str, Any],
additional_info: str,
resoning_llm: ReasoningLLM,
language: LanguageType = LanguageType.CHINESE,
) -> str:
"""获取推理大模型的风险评估结果"""
template = _env.from_string(RISK_EVALUATE[language])
prompt = template.render(
tool_name=tool.name,
tool_description=tool.description,
input_param=input_param,
additional_info=additional_info,
)
return await MCPPlanner.get_resoning_result(prompt, resoning_llm)
@staticmethod
async def _parse_risk_result(result: str) -> ToolRisk:
"""将推理结果解析为结构化数据"""
schema = ToolRisk.model_json_schema()
risk = await MCPPlanner._parse_result(result, schema)
return ToolRisk.model_validate(risk)
@staticmethod
async def _get_reasoning_tool_execute_error_type(
user_goal: str,
current_plan: MCPPlan,
tool: MCPTool,
input_param: dict[str, Any],
error_message: str,
reasoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,
) -> str:
"""获取推理大模型的工具执行错误类型"""
template = _env.from_string(TOOL_EXECUTE_ERROR_TYPE_ANALYSIS[language])
prompt = template.render(
goal=user_goal,
current_plan=current_plan.model_dump(
exclude_none=True, by_alias=True),
tool_name=tool.name,
tool_description=tool.description,
input_param=input_param,
error_message=error_message,
)
return await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
@staticmethod
async def _parse_tool_execute_error_type_result(result: str) -> ToolExcutionErrorType:
"""将推理结果解析为工具执行错误类型"""
schema = ToolExcutionErrorType.model_json_schema()
error_type = await MCPPlanner._parse_result(result, schema)
return ToolExcutionErrorType.model_validate(error_type)
@staticmethod
async def get_tool_execute_error_type(
user_goal: str,
current_plan: MCPPlan,
tool: MCPTool,
input_param: dict[str, Any],
error_message: str,
reasoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,
) -> ToolExcutionErrorType:
"""获取MCP工具执行错误类型"""
result = await MCPPlanner._get_reasoning_tool_execute_error_type(
user_goal, current_plan, tool, input_param, error_message, reasoning_llm, language
)
return await MCPPlanner._parse_tool_execute_error_type_result(result)
@staticmethod
async def is_param_error(
goal: str,
history: str,
error_message: str,
tool: MCPTool,
step_description: str,
input_params: dict[str, Any],
reasoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,
) -> IsParamError:
"""判断错误信息是否是参数错误"""
tmplate = _env.from_string(IS_PARAM_ERROR[language])
prompt = tmplate.render(
goal=goal,
history=history,
step_id=tool.id,
step_name=tool.name,
step_description=step_description,
input_params=input_params,
error_message=error_message,
)
result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
schema = IsParamError.model_json_schema()
is_param_error = await MCPPlanner._parse_result(result, schema)
return IsParamError.model_validate(is_param_error)
@staticmethod
async def change_err_message_to_description(
error_message: str,
tool: MCPTool,
input_params: dict[str, Any],
reasoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,
) -> str:
"""将错误信息转换为工具描述"""
template = _env.from_string(
CHANGE_ERROR_MESSAGE_TO_DESCRIPTION[language])
prompt = template.render(
error_message=error_message,
tool_name=tool.name,
tool_description=tool.description,
input_schema=tool.input_schema,
input_params=input_params,
)
result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
return result
@staticmethod
async def get_missing_param(
tool: MCPTool,
input_param: dict[str, Any],
error_message: str,
reasoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,
) -> list[str]:
"""获取缺失的参数"""
slot = Slot(schema=tool.input_schema)
template = _env.from_string(GET_MISSING_PARAMS[language])
schema_with_null = slot.add_null_to_basic_types()
prompt = template.render(
tool_name=tool.name,
tool_description=tool.description,
input_param=input_param,
schema=schema_with_null,
error_message=error_message,
)
result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
input_param_with_null = await MCPPlanner._parse_result(result, schema_with_null)
return input_param_with_null
@staticmethod
async def generate_answer(
user_goal: str,
memory: str,
resoning_llm: ReasoningLLM = ReasoningLLM(),
language: LanguageType = LanguageType.CHINESE,
) -> AsyncGenerator[str, None]:
"""生成最终回答"""
template = _env.from_string(FINAL_ANSWER[language])
prompt = template.render(
memory=memory,
goal=user_goal,
)
async for chunk in resoning_llm.call(
[{"role": "user", "content": prompt}],
streaming=True,
temperature=0.07,
result_only=False,
enable_thinking=True
):
yield chunk