"""网页抓取:静态 HTTP 或 Playwright 渲染后提取正文。"""
from __future__ import annotations
import gzip
import html
import re
import ssl
import urllib.error
import urllib.request
from typing import Any
from app.core.config import (
web_fetch_js_enabled,
web_fetch_max_content_chars,
web_fetch_timeout_seconds,
web_fetch_tool_enabled,
web_fetch_user_agent,
)
from app.services.url_safety import validate_http_url
_TAG_RE = re.compile(r"<[^>]+>")
_SCRIPT_STYLE_RE = re.compile(
r"<(script|style|noscript)[^>]*>.*?</\1>",
re.IGNORECASE | re.DOTALL,
)
def _browser_headers() -> dict[str, str]:
return {
"User-Agent": web_fetch_user_agent(),
"Accept": (
"text/html,application/xhtml+xml,application/xml;q=0.9,"
"image/avif,image/webp,*/*;q=0.8"
),
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"Cache-Control": "no-cache",
"Upgrade-Insecure-Requests": "1",
}
def _read_body(body: bytes, encoding: str | None) -> str:
for codec in (encoding, "utf-8", "gbk", "latin-1"):
if not codec:
continue
try:
return body.decode(codec)
except (LookupError, UnicodeDecodeError):
continue
return body.decode("utf-8", errors="replace")
def _truncate(text: str, limit: int) -> tuple[str, bool]:
if len(text) <= limit:
return text, False
return text[:limit] + "\n…[内容已截断]", True
def html_to_text(page_html: str) -> str:
cleaned = _SCRIPT_STYLE_RE.sub(" ", page_html)
text = html.unescape(_TAG_RE.sub(" ", cleaned))
text = re.sub(r"[ \t]+\n", "\n", text)
text = re.sub(r"\n{3,}", "\n\n", text)
text = re.sub(r"[ \t]{2,}", " ", text)
return text.strip()
def _fetch_static(url: str, *, timeout: int) -> tuple[str, str | None]:
req = urllib.request.Request(url, headers=_browser_headers(), method="GET")
ctx = ssl.create_default_context()
with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp:
body = resp.read()
charset = resp.headers.get_content_charset()
if resp.headers.get("Content-Encoding") == "gzip":
try:
body = gzip.decompress(body)
except OSError:
pass
page_html = _read_body(body, charset)
final_url = resp.geturl()
return page_html, final_url
def _fetch_playwright(url: str, *, timeout: int) -> tuple[str, str | None]:
try:
from playwright.sync_api import sync_playwright
except ImportError as exc:
raise ValueError(
"未安装 Playwright。请执行: pip install playwright && playwright install chromium"
) from exc
timeout_ms = timeout * 1000
page_html = ""
final_url: str | None = url
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=True)
try:
context = browser.new_context(
user_agent=web_fetch_user_agent(),
locale="zh-CN",
)
page = context.new_page()
page.set_default_timeout(timeout_ms)
page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms)
try:
page.wait_for_load_state("networkidle", timeout=min(timeout_ms, 15_000))
except Exception:
pass
page_html = page.content()
final_url = page.url
finally:
browser.close()
return page_html, final_url
def web_fetch(
url: str,
*,
render_js: bool | None = None,
extract: str = "text",
) -> dict[str, Any]:
if not web_fetch_tool_enabled():
raise ValueError("网页抓取未启用(设置 COMPILOT_WEB_FETCH_ENABLED=true)")
safe_url = validate_http_url(url)
timeout = web_fetch_timeout_seconds()
max_chars = web_fetch_max_content_chars()
use_js = render_js if render_js is not None else web_fetch_js_enabled()
if use_js and not web_fetch_js_enabled():
raise ValueError("JS 渲染未启用(设置 COMPILOT_WEB_FETCH_JS_ENABLED=true)")
mode = "playwright" if use_js else "http"
try:
if use_js:
page_html, final_url = _fetch_playwright(safe_url, timeout=timeout)
else:
page_html, final_url = _fetch_static(safe_url, timeout=timeout)
except urllib.error.HTTPError as exc:
raise ValueError(f"页面返回 HTTP {exc.code}") from exc
except urllib.error.URLError as exc:
raise ValueError(f"无法访问页面: {exc.reason}") from exc
except Exception as exc:
if use_js and "playwright" in str(exc).lower():
raise ValueError(str(exc)) from exc
raise ValueError(f"抓取失败: {exc}") from exc
extract_mode = (extract or "text").strip().lower()
if extract_mode not in ("text", "html"):
raise ValueError("extract 仅支持 text 或 html")
if extract_mode == "html":
content = page_html
else:
content = html_to_text(page_html)
content, truncated = _truncate(content, max_chars)
return {
"url": safe_url,
"final_url": final_url or safe_url,
"mode": mode,
"extract": extract_mode,
"content": content,
"content_length": len(content),
"truncated": truncated,
}