"""会话级 Playwright 浏览器:供渗透助手多步操作页面。"""

from __future__ import annotations

import threading
import time
from dataclasses import dataclass, field
from typing import Any

from app.core.config import (
    browser_idle_seconds,
    browser_max_snapshot_chars,
    browser_tool_enabled,
    web_fetch_timeout_seconds,
    web_fetch_user_agent,
)
from app.services.url_safety import validate_http_url
from app.services.web_fetch_runtime import html_to_text

_sessions: dict[str, "_BrowserSession"] = {}
_registry_lock = threading.Lock()

_SELECTOR_MAX = 500
_TEXT_MAX = 8000

_ALLOWED_ACTIONS = frozenset(
    {
        "navigate",
        "snapshot",
        "click",
        "fill",
        "press",
        "text",
        "close",
    }
)


def _require_playwright():
    try:
        from playwright.sync_api import sync_playwright

        return sync_playwright
    except ImportError as exc:
        raise ValueError(
            "未安装 Playwright。请执行: pip install playwright && playwright install chromium"
        ) from exc


def _truncate(text: str, limit: int) -> tuple[str, bool]:
    if len(text) <= limit:
        return text, False
    return text[:limit] + "\n…[内容已截断]", True


def _check_selector(selector: str) -> str:
    sel = str(selector or "").strip()
    if not sel:
        raise ValueError("缺少 selector")
    if len(sel) > _SELECTOR_MAX:
        raise ValueError("selector 过长")
    if "\n" in sel or "\r" in sel:
        raise ValueError("selector 格式无效")
    return sel


@dataclass
class _BrowserSession:
    thread_id: str
    lock: threading.RLock = field(default_factory=threading.RLock)
    last_used: float = field(default_factory=time.time)
    _playwright: Any = None
    _browser: Any = None
    _context: Any = None
    _page: Any = None

    def touch(self) -> None:
        self.last_used = time.time()

    def _ensure_page(self) -> Any:
        if self._page is not None:
            return self._page
        sync_playwright = _require_playwright()
        self._playwright = sync_playwright().start()
        try:
            self._browser = self._playwright.chromium.launch(headless=True)
        except Exception as exc:
            msg = str(exc)
            if "Executable doesn't exist" in msg or "playwright install" in msg.lower():
                raise ValueError(
                    "Chromium 未安装。请在服务器执行: playwright install chromium"
                ) from exc
            raise
        self._context = self._browser.new_context(
            user_agent=web_fetch_user_agent(),
            locale="zh-CN",
        )
        self._page = self._context.new_page()
        timeout_ms = web_fetch_timeout_seconds() * 1000
        self._page.set_default_timeout(timeout_ms)
        return self._page

    def close(self) -> None:
        with self.lock:
            for closer in (self._page, self._context, self._browser):
                if closer is not None:
                    try:
                        closer.close()
                    except Exception:
                        pass
            if self._playwright is not None:
                try:
                    self._playwright.stop()
                except Exception:
                    pass
            self._page = None
            self._context = None
            self._browser = None
            self._playwright = None


def _cleanup_idle_sessions() -> None:
    now = time.time()
    stale: list[str] = []
    with _registry_lock:
        for key, sess in _sessions.items():
            if now - sess.last_used > browser_idle_seconds():
                stale.append(key)
        for key in stale:
            sess = _sessions.pop(key, None)
            if sess:
                sess.close()


def _get_session(thread_id: str) -> _BrowserSession:
    tid = (thread_id or "").strip()
    if not tid:
        raise ValueError("缺少会话 thread_id,无法使用浏览器")
    _cleanup_idle_sessions()
    with _registry_lock:
        sess = _sessions.get(tid)
        if sess is None:
            sess = _BrowserSession(thread_id=tid)
            _sessions[tid] = sess
        sess.touch()
        return sess


def close_browser_session(thread_id: str) -> None:
    tid = (thread_id or "").strip()
    if not tid:
        return
    with _registry_lock:
        sess = _sessions.pop(tid, None)
    if sess:
        sess.close()


def _page_snapshot(page: Any) -> str:
    limit = browser_max_snapshot_chars()
    try:
        snap = page.locator("body").aria_snapshot()
        if snap:
            return _truncate(str(snap), limit)[0]
    except Exception:
        pass
    try:
        snap = page.evaluate(
            """() => {
              const title = document.title || '';
              const links = [...document.querySelectorAll('a')]
                .slice(0, 40)
                .map(a => `- ${(a.innerText||'').trim().slice(0,60)} -> ${a.href}`)
                .join('\\n');
              return `Title: ${title}\\n\\nLinks:\\n${links}`;
            }"""
        )
        if snap:
            return _truncate(str(snap), limit)[0]
    except Exception:
        pass
    return _truncate(html_to_text(page.content()), limit)[0]


def browser_act(
    thread_id: str,
    action: str,
    *,
    url: str | None = None,
    selector: str | None = None,
    text: str | None = None,
    key: str | None = None,
) -> dict[str, Any]:
    if not browser_tool_enabled():
        raise ValueError("浏览器工具未启用(设置 COMPILOT_BROWSER_ENABLED=true)")

    act = (action or "").strip().lower()
    if act not in _ALLOWED_ACTIONS:
        raise ValueError(f"不支持的 action: {action},可选: {', '.join(sorted(_ALLOWED_ACTIONS))}")

    if act == "close":
        close_browser_session(thread_id)
        return {"ok": True, "action": "close", "message": "浏览器已关闭"}

    sess = _get_session(thread_id)
    timeout_ms = web_fetch_timeout_seconds() * 1000

    with sess.lock:
        sess.touch()
        page = sess._ensure_page()

        if act == "navigate":
            safe_url = validate_http_url(str(url or ""))
            page.goto(safe_url, wait_until="domcontentloaded", timeout=timeout_ms)
            try:
                page.wait_for_load_state("networkidle", timeout=min(timeout_ms, 15_000))
            except Exception:
                pass
            return {
                "ok": True,
                "action": "navigate",
                "url": safe_url,
                "title": page.title(),
                "final_url": page.url,
            }

        if act == "snapshot":
            content, truncated = _truncate(_page_snapshot(page), browser_max_snapshot_chars())
            return {
                "ok": True,
                "action": "snapshot",
                "title": page.title(),
                "url": page.url,
                "snapshot": content,
                "truncated": truncated,
            }

        if act == "text":
            body = page.inner_text("body")
            content, truncated = _truncate(body, browser_max_snapshot_chars())
            return {
                "ok": True,
                "action": "text",
                "title": page.title(),
                "url": page.url,
                "text": content,
                "truncated": truncated,
            }

        if act == "click":
            sel = _check_selector(str(selector or ""))
            page.locator(sel).first.click(timeout=timeout_ms)
            return {
                "ok": True,
                "action": "click",
                "selector": sel,
                "title": page.title(),
                "url": page.url,
            }

        if act == "fill":
            sel = _check_selector(str(selector or ""))
            value = str(text or "")
            if not value:
                raise ValueError("fill 需要 text")
            if len(value) > _TEXT_MAX:
                raise ValueError("text 过长")
            page.locator(sel).first.fill(value, timeout=timeout_ms)
            return {
                "ok": True,
                "action": "fill",
                "selector": sel,
                "title": page.title(),
                "url": page.url,
            }

        if act == "press":
            k = str(key or "").strip()
            if not k:
                raise ValueError("press 需要 key")
            if len(k) > 50:
                raise ValueError("key 过长")
            page.keyboard.press(k)
            return {
                "ok": True,
                "action": "press",
                "key": k,
                "title": page.title(),
                "url": page.url,
            }

    raise ValueError(f"未处理的 action: {action}")