"""
MCPManager Client
This client is used to interact with the HTTP API of MCPManager
"""
import os
import json
import logging
import asyncio
import httpx
from typing import Dict, List, Any, Optional, Union
from pathlib import Path
logger = logging.getLogger("mcp_manager")
class MCPManager:
"""
MCPManager Client class
Used to interact with MCPManager's HTTP API, including querying servers, tools, and executing tool calls
"""
def __init__(self, manager_url: str = "http://localhost:8000/mcpapi", timeout: int = 30):
"""
Initialize MCPManager client
Args:
manager_url: Base URL for MCPManager API, default is http://localhost:8000/mcpapi
timeout: Request timeout (seconds)
"""
self.manager_url = manager_url
self.timeout = timeout
self.http_client = httpx.AsyncClient(timeout=timeout)
self.servers = {}
self.tools_by_server = {}
self.all_tools = []
self.openai_tools = []
async def initialize(self) -> bool:
"""
Initialize client
Returns:
bool: Initialization success status
"""
try:
logger.info(f"Initializing MCPManager client from {self.manager_url}...")
servers_response = await self.http_client.get(f"{self.manager_url}/servers")
servers_response.raise_for_status()
servers_data = servers_response.json()
self.servers = {server: {} for server in servers_data.get("servers", [])}
logger.info(f"Fetched {len(self.servers)} MCP servers")
tools_response = await self.http_client.get(f"{self.manager_url}/tools")
tools_response.raise_for_status()
tools_data = tools_response.json()
self.all_tools = tools_data.get("tools", [])
logger.info(f"Fetched {len(self.all_tools)} MCP tools")
for server in self.servers.keys():
try:
server_tools_response = await self.http_client.get(f"{self.manager_url}/server/{server}/tools")
server_tools_response.raise_for_status()
server_tools_data = server_tools_response.json()
self.tools_by_server[server] = server_tools_data.get("tools", [])
logger.info(f"Server '{server}' has {len(self.tools_by_server[server])} tools")
for tool in server_tools_data.get("tools", []):
if "function" in tool:
tool_function = tool["function"]
openai_tool = {
"type": "function",
"function": {
"name": tool_function.get("name"),
"description": tool_function.get("description", ""),
"parameters": tool_function.get("parameters", {})
}
}
self.openai_tools.append(openai_tool)
except Exception as e:
logger.error(f"Error fetching tools for server '{server}': {str(e)}")
self.tools_by_server[server] = []
logger.info(f"Converted {len(self.openai_tools)} tools to OpenAI format")
return True
except Exception as e:
logger.error(f"Error initializing MCPManager client: {str(e)}")
return False
async def list_servers(self) -> List[str]:
"""
List all available MCP servers
Returns:
List[str]: List of server IDs
"""
return list(self.servers.keys())
async def get_server_tools(self, server_id: str) -> List[Dict[str, Any]]:
"""
Get tools on a specific server
Args:
server_id: Server ID
Returns:
List[Dict[str, Any]]: List of tool schemas
"""
if server_id not in self.tools_by_server:
logger.warning(f"Unknown server ID: {server_id}")
return []
return self.tools_by_server[server_id]
async def get_all_tools(self) -> List[Dict[str, Any]]:
"""
Get all tools on all servers
Returns:
List[Dict[str, Any]]: List of tool schemas
"""
return self.all_tools
async def call_tool(self,
tool_id: str,
arguments: Dict[str, Any],
server_id: Optional[str] = None) -> Dict[str, Any]:
"""
Call MCP tool
Args:
tool_id: Tool ID
arguments: Tool arguments
server_id: (Optional) Server ID, if not provided, attempts to find automatically
Returns:
Dict[str, Any]: Tool response, always includes status field ("success" or "error")
"""
try:
if server_id is not None and "." not in tool_id:
full_tool_id = f"{server_id}.{tool_id}"
else:
full_tool_id = tool_id
logger.info(f"Calling tool: {full_tool_id}, arguments: {arguments}")
try:
response = await self.http_client.post(
f"{self.manager_url}/tool/{full_tool_id}",
json=arguments,
timeout=self.timeout
)
response.raise_for_status()
result = response.json()
logger.debug(f"Tool call result: {result}")
if isinstance(result, dict):
if "status" in result and result["status"] == "error":
error_msg = result.get("content", {}).get("error", "Unknown error")
error_detail = result.get("content", {}).get("detail", "")
logger.error(f"Tool {full_tool_id} returned error status: {error_msg}")
if error_detail:
logger.error(f"Error details: {error_detail}")
elif isinstance(result.get("content"), dict) and "error" in result["content"]:
error_msg = result["content"]["error"]
logger.error(f"Response content of tool {full_tool_id} contains error: {error_msg}")
if "status" not in result:
result["status"] = "error"
elif "status" not in result:
result["status"] = "success"
return result
except httpx.ReadTimeout:
logger.error(f"Call to tool {full_tool_id} timed out")
return {
"status": "error",
"content": {
"error": "Tool call timeout",
"detail": f"Call to {full_tool_id} timed out, please try again later",
"tool_id": full_tool_id
}
}
except httpx.HTTPStatusError as e:
logger.error(f"Call to tool {full_tool_id} HTTP error: {e.response.status_code} - {e.response.text}")
return {
"status": "error",
"content": {
"error": f"HTTP Error {e.response.status_code}",
"detail": e.response.text,
"tool_id": full_tool_id
}
}
except httpx.RequestError as e:
logger.error(f"Call to tool {full_tool_id} request error: {str(e)}")
return {
"status": "error",
"content": {
"error": f"Request error",
"detail": str(e),
"tool_id": full_tool_id
}
}
except Exception as e:
logger.error(f"Error calling tool '{tool_id}': {str(e)}")
import traceback
tb = traceback.format_exc()
logger.error(f"Error details: {tb}")
return {
"status": "error",
"content": {
"error": str(e),
"detail": "Internal error occurred while calling tool",
"traceback": tb,
"tool_id": tool_id,
"server_id": server_id
}
}
async def close(self):
"""Close HTTP client"""
await self.http_client.aclose()