"""会话级 Playwright 浏览器:供渗透助手多步操作页面。"""
from __future__ import annotations
import threading
import time
from dataclasses import dataclass, field
from typing import Any
from app.core.config import (
browser_idle_seconds,
browser_max_snapshot_chars,
browser_tool_enabled,
web_fetch_timeout_seconds,
web_fetch_user_agent,
)
from app.services.url_safety import validate_http_url
from app.services.web_fetch_runtime import html_to_text
_sessions: dict[str, "_BrowserSession"] = {}
_registry_lock = threading.Lock()
_SELECTOR_MAX = 500
_TEXT_MAX = 8000
_ALLOWED_ACTIONS = frozenset(
{
"navigate",
"snapshot",
"click",
"fill",
"press",
"text",
"close",
}
)
def _require_playwright():
try:
from playwright.sync_api import sync_playwright
return sync_playwright
except ImportError as exc:
raise ValueError(
"未安装 Playwright。请执行: pip install playwright && playwright install chromium"
) from exc
def _truncate(text: str, limit: int) -> tuple[str, bool]:
if len(text) <= limit:
return text, False
return text[:limit] + "\n…[内容已截断]", True
def _check_selector(selector: str) -> str:
sel = str(selector or "").strip()
if not sel:
raise ValueError("缺少 selector")
if len(sel) > _SELECTOR_MAX:
raise ValueError("selector 过长")
if "\n" in sel or "\r" in sel:
raise ValueError("selector 格式无效")
return sel
@dataclass
class _BrowserSession:
thread_id: str
lock: threading.RLock = field(default_factory=threading.RLock)
last_used: float = field(default_factory=time.time)
_playwright: Any = None
_browser: Any = None
_context: Any = None
_page: Any = None
def touch(self) -> None:
self.last_used = time.time()
def _ensure_page(self) -> Any:
if self._page is not None:
return self._page
sync_playwright = _require_playwright()
self._playwright = sync_playwright().start()
try:
self._browser = self._playwright.chromium.launch(headless=True)
except Exception as exc:
msg = str(exc)
if "Executable doesn't exist" in msg or "playwright install" in msg.lower():
raise ValueError(
"Chromium 未安装。请在服务器执行: playwright install chromium"
) from exc
raise
self._context = self._browser.new_context(
user_agent=web_fetch_user_agent(),
locale="zh-CN",
)
self._page = self._context.new_page()
timeout_ms = web_fetch_timeout_seconds() * 1000
self._page.set_default_timeout(timeout_ms)
return self._page
def close(self) -> None:
with self.lock:
for closer in (self._page, self._context, self._browser):
if closer is not None:
try:
closer.close()
except Exception:
pass
if self._playwright is not None:
try:
self._playwright.stop()
except Exception:
pass
self._page = None
self._context = None
self._browser = None
self._playwright = None
def _cleanup_idle_sessions() -> None:
now = time.time()
stale: list[str] = []
with _registry_lock:
for key, sess in _sessions.items():
if now - sess.last_used > browser_idle_seconds():
stale.append(key)
for key in stale:
sess = _sessions.pop(key, None)
if sess:
sess.close()
def _get_session(thread_id: str) -> _BrowserSession:
tid = (thread_id or "").strip()
if not tid:
raise ValueError("缺少会话 thread_id,无法使用浏览器")
_cleanup_idle_sessions()
with _registry_lock:
sess = _sessions.get(tid)
if sess is None:
sess = _BrowserSession(thread_id=tid)
_sessions[tid] = sess
sess.touch()
return sess
def close_browser_session(thread_id: str) -> None:
tid = (thread_id or "").strip()
if not tid:
return
with _registry_lock:
sess = _sessions.pop(tid, None)
if sess:
sess.close()
def _page_snapshot(page: Any) -> str:
limit = browser_max_snapshot_chars()
try:
snap = page.locator("body").aria_snapshot()
if snap:
return _truncate(str(snap), limit)[0]
except Exception:
pass
try:
snap = page.evaluate(
"""() => {
const title = document.title || '';
const links = [...document.querySelectorAll('a')]
.slice(0, 40)
.map(a => `- ${(a.innerText||'').trim().slice(0,60)} -> ${a.href}`)
.join('\\n');
return `Title: ${title}\\n\\nLinks:\\n${links}`;
}"""
)
if snap:
return _truncate(str(snap), limit)[0]
except Exception:
pass
return _truncate(html_to_text(page.content()), limit)[0]
def browser_act(
thread_id: str,
action: str,
*,
url: str | None = None,
selector: str | None = None,
text: str | None = None,
key: str | None = None,
) -> dict[str, Any]:
if not browser_tool_enabled():
raise ValueError("浏览器工具未启用(设置 COMPILOT_BROWSER_ENABLED=true)")
act = (action or "").strip().lower()
if act not in _ALLOWED_ACTIONS:
raise ValueError(f"不支持的 action: {action},可选: {', '.join(sorted(_ALLOWED_ACTIONS))}")
if act == "close":
close_browser_session(thread_id)
return {"ok": True, "action": "close", "message": "浏览器已关闭"}
sess = _get_session(thread_id)
timeout_ms = web_fetch_timeout_seconds() * 1000
with sess.lock:
sess.touch()
page = sess._ensure_page()
if act == "navigate":
safe_url = validate_http_url(str(url or ""))
page.goto(safe_url, wait_until="domcontentloaded", timeout=timeout_ms)
try:
page.wait_for_load_state("networkidle", timeout=min(timeout_ms, 15_000))
except Exception:
pass
return {
"ok": True,
"action": "navigate",
"url": safe_url,
"title": page.title(),
"final_url": page.url,
}
if act == "snapshot":
content, truncated = _truncate(_page_snapshot(page), browser_max_snapshot_chars())
return {
"ok": True,
"action": "snapshot",
"title": page.title(),
"url": page.url,
"snapshot": content,
"truncated": truncated,
}
if act == "text":
body = page.inner_text("body")
content, truncated = _truncate(body, browser_max_snapshot_chars())
return {
"ok": True,
"action": "text",
"title": page.title(),
"url": page.url,
"text": content,
"truncated": truncated,
}
if act == "click":
sel = _check_selector(str(selector or ""))
page.locator(sel).first.click(timeout=timeout_ms)
return {
"ok": True,
"action": "click",
"selector": sel,
"title": page.title(),
"url": page.url,
}
if act == "fill":
sel = _check_selector(str(selector or ""))
value = str(text or "")
if not value:
raise ValueError("fill 需要 text")
if len(value) > _TEXT_MAX:
raise ValueError("text 过长")
page.locator(sel).first.fill(value, timeout=timeout_ms)
return {
"ok": True,
"action": "fill",
"selector": sel,
"title": page.title(),
"url": page.url,
}
if act == "press":
k = str(key or "").strip()
if not k:
raise ValueError("press 需要 key")
if len(k) > 50:
raise ValueError("key 过长")
page.keyboard.press(k)
return {
"ok": True,
"action": "press",
"key": k,
"title": page.title(),
"url": page.url,
}
raise ValueError(f"未处理的 action: {action}")