"""AI 工具执行器。"""
from __future__ import annotations
import json
from typing import Any
from app.core.context import get_current_thread_id, get_current_user, get_user_id
from app.services.browser_runtime import browser_act
from app.services import knowledge_store
from app.services.bing_search_runtime import bing_search
from app.services.web_fetch_runtime import web_fetch
from app.services.terminal_runtime import TERMINAL_WORKSPACE, run_terminal_command
from app.version import __version__
def execute_tool(tool_id: str, args: dict[str, Any] | None = None) -> str:
args = args or {}
if tool_id == "get_platform_info":
return json.dumps(
{
"name": "ComPilot Scan",
"version": __version__,
"capabilities": [
"渗透测试 AI 对话",
"知识库查询与保存(报告/漏洞/目标/笔记)",
"沙箱终端命令执行(terminal_run)",
"Bing 联网搜索(bing_search)",
"网页抓取与 JS 渲染(web_fetch)",
"Playwright 浏览器多步操作(browser)",
"会话历史与流式回复",
],
"kb_categories": sorted(knowledge_store.KB_CATEGORIES),
"terminal_workspace": str(TERMINAL_WORKSPACE),
},
ensure_ascii=False,
)
if tool_id == "kb_list":
items = knowledge_store.list_entries(
category=args.get("category"),
keyword=args.get("keyword"),
limit=int(args.get("limit") or 20),
)
return json.dumps({"items": items, "count": len(items)}, ensure_ascii=False)
if tool_id == "kb_search":
kw = str(args.get("keyword") or "").strip()
if not kw:
return json.dumps({"error": "缺少 keyword"}, ensure_ascii=False)
items = knowledge_store.search_entries(kw, limit=int(args.get("limit") or 15))
return json.dumps({"items": items, "count": len(items)}, ensure_ascii=False)
if tool_id == "kb_get":
eid = str(args.get("entry_id") or "").strip()
if not eid:
return json.dumps({"error": "缺少 entry_id"}, ensure_ascii=False)
entry = knowledge_store.get_entry(eid, include_content=True)
if not entry:
return json.dumps({"error": f"未找到条目 {eid}"}, ensure_ascii=False)
return json.dumps(entry, ensure_ascii=False)
if tool_id == "kb_create":
try:
user = get_current_user()
created_by = str((user or {}).get("username") or get_user_id() or "ai")
entry = knowledge_store.create_entry(
str(args.get("title") or ""),
str(args.get("content") or ""),
category=str(args.get("category") or "general"),
tags=args.get("tags") if isinstance(args.get("tags"), list) else None,
summary=str(args.get("summary") or ""),
created_by=created_by,
)
return json.dumps({"ok": True, "entry": entry}, ensure_ascii=False)
except ValueError as exc:
return json.dumps({"error": str(exc)}, ensure_ascii=False)
if tool_id == "kb_update":
eid = str(args.get("entry_id") or "").strip()
if not eid:
return json.dumps({"error": "缺少 entry_id"}, ensure_ascii=False)
patch = {k: v for k, v in args.items() if k != "entry_id" and v is not None}
try:
entry = knowledge_store.update_entry(eid, patch)
return json.dumps({"ok": True, "entry": entry}, ensure_ascii=False)
except ValueError as exc:
return json.dumps({"error": str(exc)}, ensure_ascii=False)
if tool_id == "browser":
thread_id = get_current_thread_id()
if not thread_id:
return json.dumps(
{"error": "浏览器工具需在对话会话内使用"},
ensure_ascii=False,
)
try:
result = browser_act(
thread_id,
str(args.get("action") or ""),
url=args.get("url"),
selector=args.get("selector"),
text=args.get("text"),
key=args.get("key"),
)
return json.dumps(result, ensure_ascii=False)
except ValueError as exc:
return json.dumps({"error": str(exc)}, ensure_ascii=False)
if tool_id == "web_fetch":
page_url = str(args.get("url") or "").strip()
if not page_url:
return json.dumps({"error": "缺少 url"}, ensure_ascii=False)
render_js = args.get("render_js")
if render_js is not None:
render_js = bool(render_js)
try:
result = web_fetch(
page_url,
render_js=render_js,
extract=str(args.get("extract") or "text"),
)
return json.dumps(result, ensure_ascii=False)
except ValueError as exc:
return json.dumps({"error": str(exc)}, ensure_ascii=False)
if tool_id == "bing_search":
q = str(args.get("query") or "").strip()
if not q:
return json.dumps({"error": "缺少 query"}, ensure_ascii=False)
try:
result = bing_search(
q,
num_results=args.get("num_results"),
market=args.get("market"),
)
return json.dumps(result, ensure_ascii=False)
except ValueError as exc:
return json.dumps({"error": str(exc)}, ensure_ascii=False)
if tool_id == "terminal_run":
cmd = str(args.get("command") or "").strip()
if not cmd:
return json.dumps({"error": "缺少 command"}, ensure_ascii=False)
try:
result = run_terminal_command(
cmd,
cwd=args.get("cwd"),
timeout_seconds=args.get("timeout_seconds"),
)
return json.dumps(result, ensure_ascii=False)
except ValueError as exc:
return json.dumps({"error": str(exc)}, ensure_ascii=False)
if tool_id == "kb_delete":
eid = str(args.get("entry_id") or "").strip()
if not eid:
return json.dumps({"error": "缺少 entry_id"}, ensure_ascii=False)
try:
knowledge_store.delete_entry(eid)
return json.dumps({"ok": True, "entry_id": eid}, ensure_ascii=False)
except ValueError as exc:
return json.dumps({"error": str(exc)}, ensure_ascii=False)
return json.dumps({"error": f"未知工具: {tool_id}"}, ensure_ascii=False)