"""
MCP Server Tool Evaluator (AgentCPM-MCP Version)
Used to evaluate tool availability and performance on MCP servers
"""
import os
import sys
import asyncio
import json
import logging
import random
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple, Set, Union
file_path = Path(__file__).resolve()
script_dir = file_path.parent
src_dir = script_dir.parent
project_root = src_dir.parent
sys.path.insert(0, str(project_root))
sys.path.insert(0, str(src_dir))
from src.mcp_manager import MCPManager
from src.extended_openai_client import get_extended_llm_client
logger = logging.getLogger("tool_evaluator")
class ToolEvaluator:
"""
MCP Server Tool Evaluator (AgentCPM-MCP Version)
Used to evaluate tool availability and performance on MCP servers
"""
def __init__(
self,
manager_url: str = "http://localhost:8000/mcpapi",
provider: str = "openai",
model: str = "gpt-4o-mini",
api_key: Optional[str] = None,
base_url: Optional[str] = None,
results_dir: str = None,
log_level: str = "INFO",
):
"""
Initialize evaluator
Args:
manager_url: MCPManager API URL
provider: LLM provider
model: LLM model name
api_key: API key
base_url: API base URL
results_dir: Results save directory
log_level: Log level
"""
numeric_level = getattr(logging, log_level.upper(), None)
if not isinstance(numeric_level, int):
numeric_level = logging.INFO
logger.setLevel(numeric_level)
self.manager = MCPManager(manager_url=manager_url)
self.llm_client = get_extended_llm_client(
provider=provider,
model=model,
api_key=api_key,
base_url=base_url
)
self.provider = provider
self.model = model
self.api_key = api_key
self.base_url = base_url
self.all_tools = []
self.tools_by_server = {}
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if results_dir:
self.results_dir = Path(results_dir)
else:
self.results_dir = project_root / "evaluation_results" / timestamp
os.makedirs(self.results_dir, exist_ok=True)
logger.info(f"Results will be saved to: {self.results_dir}")
async def initialize(self) -> bool:
"""
Initialize evaluator, connect to MCPManager and load tool list
Returns:
bool: Whether initialization was successful
"""
try:
logger.info("Initializing MCPManager client...")
if not await self.manager.initialize():
logger.error("MCPManager client initialization failed")
return False
self.all_tools = self.manager.openai_tools
logger.info(f"Retrieved {len(self.all_tools)} tools from MCPManager")
servers = await self.manager.list_servers()
for server in servers:
server_tools = await self.manager.get_server_tools(server)
self.tools_by_server[server] = server_tools
logger.info(f"Server '{server}' has {len(server_tools)} tools")
return True
except Exception as e:
logger.error(f"Initialization failed: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return False
async def close(self):
"""Close evaluator resources"""
if hasattr(self, 'manager') and self.manager:
await self.manager.close()
logger.info("MCPManager client closed")
def find_tool_by_name(self, tool_name: str, server_name: Optional[str] = None) -> Tuple[Optional[str], Optional[Dict]]:
"""
Find tool by name
Args:
tool_name: Tool name
server_name: Server name (optional)
Returns:
Tuple[Optional[str], Optional[Dict]]: Tuple containing server name and tool definition, returns (None, None) if not found
"""
if server_name and server_name in self.tools_by_server:
for tool in self.tools_by_server[server_name]:
if "function" in tool and tool["function"].get("name") == tool_name:
return server_name, tool
else:
for server, tools in self.tools_by_server.items():
for tool in tools:
if "function" in tool and tool["function"].get("name") == tool_name:
return server, tool
return None, None
def generate_test_prompt(self, tool_schema: Dict) -> str:
"""
Generate test prompt for tool
Args:
tool_schema: Tool schema
Returns:
str: Test prompt
"""
try:
tool_name = tool_schema.get("function", {}).get("name", "unknown_tool")
description = tool_schema.get("function", {}).get("description", "No description available")
parameters = tool_schema.get("function", {}).get("parameters", {})
prompt = f"I need to test a tool named '{tool_name}'.\n\n"
prompt += f"Tool description: {description}\n\n"
if parameters:
prompt += "This tool requires the following parameters:\n"
required_params = parameters.get("required", [])
properties = parameters.get("properties", {})
for param_name, param_info in properties.items():
param_type = param_info.get("type", "unknown")
param_desc = param_info.get("description", "No description")
is_required = param_name in required_params
prompt += f"- {param_name} ({param_type}): {param_desc}"
if is_required:
prompt += " (required)"
prompt += "\n"
prompt += "\nPlease generate a valid test case to call this tool. Provide reasonable parameter values and explain why these values are appropriate for testing this tool."
return prompt
except Exception as e:
logger.error(f"Error generating test prompt: {str(e)}")
return f"Please test the tool named '{tool_schema.get('function', {}).get('name', 'unknown_tool')}' and provide appropriate parameters."
async def generate_test_parameters(self, tool_schema: Dict) -> Dict[str, Any]:
"""
Use LLM to generate test parameters for tool
Args:
tool_schema: Tool schema
Returns:
Dict[str, Any]: Test parameters
"""
try:
tool_name = tool_schema.get("function", {}).get("name", "unknown_tool")
tool_description = tool_schema.get("function", {}).get("description", "")
system_prompt = f"""You are an AI assistant specialized in generating tool calls. Your task is to generate an appropriate tool call for the following tool.
Do not describe or explain tool calls in the response content, but generate tool calls directly through the tool_calls function. Your content should be empty (null) or briefly explain what you will execute.
Tool information:
Name: {tool_name}
Description: {tool_description}
"""
if "parameters" in tool_schema.get("function", {}) and tool_schema["function"]["parameters"]:
logger.debug(f"Tool {tool_name} parameter schema: {json.dumps(tool_schema['function']['parameters'], ensure_ascii=False)[:100]}...")
required_params = []
if isinstance(tool_schema["function"]["parameters"], dict):
required_params = tool_schema["function"]["parameters"].get("required", [])
system_prompt += f"\nParameter schema: {json.dumps(tool_schema['function']['parameters'], ensure_ascii=False, indent=2)}"
if required_params:
system_prompt += f"\n\nRequired parameters: {', '.join(required_params)}"
else:
logger.warning(f"Tool {tool_name} has no parameter schema definition")
system_prompt += """
Important notes:
1. Directly use the tool_calls function to generate tool calls, do not describe tool calls in content
2. Provide all required parameters, ensure parameter values meet schema requirements
3. Do not use placeholders, use actual, reasonable values
4. If the tool requires code parameters, provide valid, short code examples
5. Your response should have one tool call, and content field can be null or brief explanation"""
user_prompt = f"Please imagine a simple scenario corresponding to this tool and generate a valid tool call for tool '{tool_name}'. Remember to use the tool_calls function instead of describing in content."
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
logger.info(f"Generating test parameters for tool '{tool_name}'...")
response = self.llm_client.create_completion(
messages=messages,
tools=[tool_schema]
)
tool_calls = response.get("tool_calls", [])
if not tool_calls:
logger.warning(f"LLM did not generate tool calls, will use default parameters")
return self._generate_default_parameters(tool_schema)
tool_call = tool_calls[0]
if isinstance(tool_call, dict) and "function" in tool_call:
tool_name_called = tool_call["function"].get("name", "")
raw_arguments = tool_call["function"].get("arguments", "{}")
if tool_name_called != tool_name:
logger.warning(f"Tool name mismatch: expected {tool_name}, got {tool_name_called}, will use default parameters")
return self._generate_default_parameters(tool_schema)
try:
parameters = json.loads(raw_arguments) if isinstance(raw_arguments, str) else raw_arguments
return parameters
except json.JSONDecodeError as e:
logger.warning(f"Parameter parsing error: {str(e)}, raw arguments: {raw_arguments}")
return self._generate_default_parameters(tool_schema)
else:
logger.warning("Unable to parse LLM tool calls, will use default parameters")
return self._generate_default_parameters(tool_schema)
except Exception as e:
logger.error(f"Error generating test parameters: {str(e)}")
return self._generate_default_parameters(tool_schema)
def _generate_default_parameters(self, tool_schema: Dict) -> Dict[str, Any]:
"""
Generate default parameters for tool
Args:
tool_schema: Tool schema
Returns:
Dict[str, Any]: Default parameters
"""
parameters = {}
param_defs = tool_schema.get("function", {}).get("parameters", {}).get("properties", {})
for param_name, param_info in param_defs.items():
param_type = param_info.get("type", "string")
if param_type == "string":
parameters[param_name] = f"test_{param_name}"
elif param_type == "number" or param_type == "integer":
parameters[param_name] = 1
elif param_type == "boolean":
parameters[param_name] = True
elif param_type == "array":
parameters[param_name] = []
elif param_type == "object":
parameters[param_name] = {}
return parameters
async def evaluate_tool(self, server_name: str, tool_name: str, iterations: int = 3) -> Dict[str, Any]:
"""
Evaluate single tool
Args:
server_name: Server name
tool_name: Tool name
iterations: Number of evaluation iterations
Returns:
Dict[str, Any]: Evaluation results
"""
server, tool_schema = self.find_tool_by_name(tool_name, server_name)
if not server or not tool_schema:
logger.error(f"Tool '{tool_name}' not found on server '{server_name}'")
return {
"tool_name": tool_name,
"server_name": server_name,
"success": False,
"success_rate": 0.0,
"error": f"Tool '{tool_name}' not found on server '{server_name}'"
}
logger.info(f"Starting evaluation of tool: '{tool_name}' (server: '{server}')")
tool_dir = self.results_dir / server / f"{tool_name}_logs"
os.makedirs(tool_dir, exist_ok=True)
parameters = await self.generate_test_parameters(tool_schema)
params_file = tool_dir / "test_parameters.json"
with open(params_file, "w", encoding="utf-8") as f:
json.dump(parameters, f, ensure_ascii=False, indent=2)
results = []
successful_calls = 0
total_latency = 0
for i in range(iterations):
logger.info(f"Tool '{tool_name}' evaluation iteration {i+1}/{iterations}")
start_time = time.time()
call_result = {
"iteration": i + 1,
"success": False,
"parameters": parameters,
"latency": 0
}
try:
result = await self.manager.call_tool(tool_name, parameters, server)
latency = time.time() - start_time
call_result["latency"] = latency
if isinstance(result, dict) and result.get("status") == "error":
error_msg = result.get("content", {}).get("error", "Unknown error")
error_detail = result.get("content", {}).get("detail", "")
logger.warning(f"Tool '{tool_name}' call failed: {error_msg}")
call_result["error"] = error_msg
if error_detail:
call_result["error_detail"] = error_detail
log_file = tool_dir / f"iteration_{i+1}_failed.json"
else:
successful_calls += 1
total_latency += latency
logger.info(f"Tool '{tool_name}' call successful, latency: {latency:.2f}s")
call_result["success"] = True
call_result["result"] = result
log_file = tool_dir / f"iteration_{i+1}_success.json"
with open(log_file, "w", encoding="utf-8") as f:
json.dump(call_result, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Tool '{tool_name}' evaluation error: {str(e)}")
latency = time.time() - start_time
call_result["latency"] = latency
call_result["error"] = str(e)
log_file = tool_dir / f"iteration_{i+1}_error.json"
with open(log_file, "w", encoding="utf-8") as f:
json.dump(call_result, f, ensure_ascii=False, indent=2)
results.append(call_result)
success_rate = successful_calls / iterations if iterations > 0 else 0
avg_latency = total_latency / successful_calls if successful_calls > 0 else 0
evaluation_result = {
"tool_name": tool_name,
"server_name": server,
"success": successful_calls > 0,
"success_rate": success_rate,
"avg_latency": avg_latency,
"total_latency": total_latency,
"iterations": iterations,
"successful_calls": successful_calls,
"failed_calls": iterations - successful_calls,
"parameters": parameters,
"results": results,
"timestamp": datetime.now().isoformat()
}
result_file = self.results_dir / server / f"{tool_name}.json"
with open(result_file, "w", encoding="utf-8") as f:
json.dump(evaluation_result, f, ensure_ascii=False, indent=2)
return evaluation_result
async def evaluate_server(self, server_name: str, iterations: int = 3) -> Dict[str, Any]:
"""
Evaluate all tools on single server
Args:
server_name: Server name
iterations: Number of evaluation iterations
Returns:
Dict[str, Any]: Evaluation results
"""
logger.info(f"Starting evaluation of server: '{server_name}'")
tools = []
for server, tool_list in self.tools_by_server.items():
if server == server_name:
tools = tool_list
break
if not tools:
logger.error(f"Server '{server_name}' has no available tools")
return {
"server_name": server_name,
"success": False,
"error": f"Server '{server_name}' has no available tools"
}
server_dir = self.results_dir / server_name
os.makedirs(server_dir, exist_ok=True)
tool_results = {}
total_calls = 0
successful_calls = 0
total_latency = 0
for tool in tools:
tool_name = None
if isinstance(tool, dict):
if "function" in tool:
tool_name = tool.get("function", {}).get("name", "")
else:
tool_name = tool.get("name", "")
elif hasattr(tool, "name"):
tool_name = tool.name
if not tool_name:
logger.warning(f"Skipping unknown tool: {tool}")
continue
result = await self.evaluate_tool(server_name, tool_name, iterations)
tool_results[tool_name] = result
total_calls += result.get("iterations", 0)
successful_calls += result.get("successful_calls", 0)
total_latency += result.get("total_latency", 0)
success_rate = successful_calls / total_calls if total_calls > 0 else 0
avg_latency = total_latency / successful_calls if successful_calls > 0 else 0
evaluation_result = {
"server_name": server_name,
"success": successful_calls > 0,
"success_rate": success_rate,
"avg_latency": avg_latency,
"total_latency": total_latency,
"total_calls": total_calls,
"successful_calls": successful_calls,
"failed_calls": total_calls - successful_calls,
"total_tools": len(tools),
"tool_results": tool_results,
"timestamp": datetime.now().isoformat()
}
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
result_file = self.results_dir / f"server_{server_name}_{timestamp}.json"
with open(result_file, "w", encoding="utf-8") as f:
json.dump(evaluation_result, f, ensure_ascii=False, indent=2)
logger.info(f"Server '{server_name}' evaluation completed, results saved to: {result_file}")
return evaluation_result
async def evaluate_all_servers(self, iterations: int = 3) -> Dict[str, Any]:
"""
Evaluate all servers
Args:
iterations: Number of evaluation iterations
Returns:
Dict[str, Any]: Evaluation results
"""
logger.info("Starting evaluation of all servers")
server_results = {}
total_tools = 0
total_calls = 0
successful_calls = 0
total_latency = 0
for server_name in self.tools_by_server.keys():
result = await self.evaluate_server(server_name, iterations)
server_results[server_name] = result
total_tools += result.get("total_tools", 0)
total_calls += result.get("total_calls", 0)
successful_calls += result.get("successful_calls", 0)
total_latency += result.get("total_latency", 0)
success_rate = successful_calls / total_calls if total_calls > 0 else 0
avg_latency = total_latency / successful_calls if successful_calls > 0 else 0
evaluation_result = {
"success": successful_calls > 0,
"success_rate": success_rate,
"avg_latency": avg_latency,
"total_latency": total_latency,
"total_calls": total_calls,
"successful_calls": successful_calls,
"failed_calls": total_calls - successful_calls,
"total_servers": len(self.tools_by_server),
"total_tools": total_tools,
"server_results": server_results,
"timestamp": datetime.now().isoformat()
}
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
result_file = self.results_dir / f"all_servers_{timestamp}.json"
with open(result_file, "w", encoding="utf-8") as f:
json.dump(evaluation_result, f, ensure_ascii=False, indent=2)
self.generate_tool_reports(evaluation_result)
logger.info(f"All servers evaluation completed, results saved to: {result_file}")
return evaluation_result
def generate_tool_reports(self, evaluation_result: Dict[str, Any]) -> None:
"""
Generate tool reports
Args:
evaluation_result: Evaluation results
"""
logger.info("Generating tool reports")
reports_dir = self.results_dir / "reports"
os.makedirs(reports_dir, exist_ok=True)
overall_report = f"""# MCP Tool Evaluation Report
## Overall Statistics
- Evaluation time: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
- Number of servers: {evaluation_result.get("total_servers", 0)}
- Total tools: {evaluation_result.get("total_tools", 0)}
- Total calls: {evaluation_result.get("total_calls", 0)}
- Successful calls: {evaluation_result.get("successful_calls", 0)}
- Failed calls: {evaluation_result.get("failed_calls", 0)}
- Success rate: {evaluation_result.get("success_rate", 0) * 100:.2f}%
- Average latency: {evaluation_result.get("avg_latency", 0):.2f}s
## Server Statistics
| Server | Tool Count | Total Calls | Successful Calls | Success Rate | Average Latency(s) |
|--------|------------|-------------|------------------|--------------|-------------------|
"""
server_results = evaluation_result.get("server_results", {})
for server_name, server_result in server_results.items():
total_tools = server_result.get("total_tools", 0)
total_calls = server_result.get("total_calls", 0)
successful_calls = server_result.get("successful_calls", 0)
success_rate = server_result.get("success_rate", 0) * 100
avg_latency = server_result.get("avg_latency", 0)
overall_report += f"| {server_name} | {total_tools} | {total_calls} | {successful_calls} | {success_rate:.2f}% | {avg_latency:.2f} |\n"
with open(reports_dir / "overall_report.md", "w", encoding="utf-8") as f:
f.write(overall_report)
for server_name, server_result in server_results.items():
server_report = f"""# Server '{server_name}' Tool Evaluation Report
## Overall Statistics
- Evaluation time: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
- Total tools: {server_result.get("total_tools", 0)}
- Total calls: {server_result.get("total_calls", 0)}
- Successful calls: {server_result.get("successful_calls", 0)}
- Failed calls: {server_result.get("failed_calls", 0)}
- Success rate: {server_result.get("success_rate", 0) * 100:.2f}%
- Average latency: {server_result.get("avg_latency", 0):.2f}s
## Tool Statistics
| Tool Name | Total Calls | Successful Calls | Success Rate | Average Latency(s) |
|-----------|-------------|------------------|--------------|-------------------|
"""
tool_results = server_result.get("tool_results", {})
for tool_name, tool_result in tool_results.items():
iterations = tool_result.get("iterations", 0)
successful_calls = tool_result.get("successful_calls", 0)
success_rate = tool_result.get("success_rate", 0) * 100
avg_latency = tool_result.get("avg_latency", 0)
server_report += f"| {tool_name} | {iterations} | {successful_calls} | {success_rate:.2f}% | {avg_latency:.2f} |\n"
with open(reports_dir / f"server_{server_name}_report.md", "w", encoding="utf-8") as f:
f.write(server_report)
server_tools_mapping = {}
for server_name, tools in self.tools_by_server.items():
tool_names = []
for tool in tools:
tool_name = None
if isinstance(tool, dict):
if "function" in tool:
tool_name = tool.get("function", {}).get("name", "")
else:
tool_name = tool.get("name", "")
elif hasattr(tool, "name"):
tool_name = tool.name
if tool_name:
tool_names.append(tool_name)
server_tools_mapping[server_name] = tool_names
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
mapping_file = self.results_dir / f"server_tools_mapping_{timestamp}.json"
with open(mapping_file, "w", encoding="utf-8") as f:
json.dump(server_tools_mapping, f, ensure_ascii=False, indent=2)
fixed_mapping_file = Path("src/server_eval/server_to_tools.json")
with open(fixed_mapping_file, "w", encoding="utf-8") as f:
json.dump(server_tools_mapping, f, ensure_ascii=False, indent=2)
logger.info(f"Tool reports generated in directory: {reports_dir}")
logger.info(f"Overall report saved to: {reports_dir / 'overall_report.md'}")
logger.info(f"Server tool mapping saved to file: {mapping_file}")
logger.info(f"Fixed version saved to: {fixed_mapping_file}")