"""Bing 联网搜索:模拟浏览器请求并解析搜索结果。"""
from __future__ import annotations
import base64
import gzip
import html
import re
import ssl
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
from app.core.config import (
bing_search_max_results,
bing_search_timeout_seconds,
bing_search_tool_enabled,
bing_search_user_agent,
)
_BING_SEARCH_URL = "https://www.bing.com/search"
_BROWSER_HEADERS_BASE = {
"Accept": (
"text/html,application/xhtml+xml,application/xml;q=0.9,"
"image/avif,image/webp,image/apng,*/*;q=0.8"
),
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
}
_ALGO_BLOCK_RE = re.compile(
r'<li[^>]*\bb_algo\b[^>]*>(.*?)</li>',
re.IGNORECASE | re.DOTALL,
)
_LINK_RE = re.compile(
r'<h2[^>]*>\s*<a[^>]+href="([^"]+)"[^>]*>(.*?)</a>',
re.IGNORECASE | re.DOTALL,
)
_SNIPPET_RE = re.compile(
r'<p[^>]*\bb_lineclamp[^>]*>(.*?)</p>',
re.IGNORECASE | re.DOTALL,
)
_TAG_RE = re.compile(r"<[^>]+>")
def _strip_tags(text: str) -> str:
return html.unescape(_TAG_RE.sub("", text)).strip()
def _unwrap_bing_redirect(href: str) -> str:
if "bing.com/ck/" not in href and "bing.com/aclick" not in href:
return href
try:
parsed = urllib.parse.urlparse(href)
qs = urllib.parse.parse_qs(parsed.query)
raw_u = (qs.get("u") or [None])[0]
if not raw_u:
return href
payload = urllib.parse.unquote(str(raw_u))
if payload.startswith("a1"):
decoded = base64.urlsafe_b64decode(payload[2:] + "==")
return decoded.decode("utf-8", errors="replace")
if payload.startswith(("http://", "https://")):
return payload
except Exception:
return href
return href
def _read_response(body: bytes, encoding: str | None, headers: Any) -> str:
enc = (encoding or "").lower()
if enc == "gzip" or (isinstance(headers, dict) and headers.get("Content-Encoding") == "gzip"):
try:
body = gzip.decompress(body)
except OSError:
pass
for codec in (enc, "utf-8", "gbk", "latin-1"):
if not codec or codec in ("gzip", "br", "deflate"):
continue
try:
return body.decode(codec)
except (LookupError, UnicodeDecodeError):
continue
return body.decode("utf-8", errors="replace")
def _parse_results(page_html: str, *, limit: int) -> list[dict[str, str]]:
results: list[dict[str, str]] = []
seen_urls: set[str] = set()
for block in _ALGO_BLOCK_RE.finditer(page_html):
chunk = block.group(1)
link_m = _LINK_RE.search(chunk)
if not link_m:
continue
url = _unwrap_bing_redirect(html.unescape(link_m.group(1).strip()))
title = _strip_tags(link_m.group(2))
if not url or not title:
continue
if url in seen_urls:
continue
seen_urls.add(url)
snippet_m = _SNIPPET_RE.search(chunk)
snippet = _strip_tags(snippet_m.group(1)) if snippet_m else ""
results.append({"title": title, "url": url, "snippet": snippet})
if len(results) >= limit:
break
return results
def _fetch_bing_html(query: str, *, count: int, market: str) -> str:
params = {
"q": query,
"count": str(min(max(count, 1), 50)),
"setlang": market.split("-", 1)[0] if market else "zh",
"cc": market.split("-", 1)[-1] if "-" in market else "CN",
"form": "QBLH",
}
url = f"{_BING_SEARCH_URL}?{urllib.parse.urlencode(params)}"
headers = {
**_BROWSER_HEADERS_BASE,
"User-Agent": bing_search_user_agent(),
"Referer": "https://www.bing.com/",
}
req = urllib.request.Request(url, headers=headers, method="GET")
timeout = bing_search_timeout_seconds()
ctx = ssl.create_default_context()
try:
with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp:
body = resp.read()
return _read_response(body, resp.headers.get_content_charset(), resp.headers)
except urllib.error.HTTPError as exc:
body = exc.read()
text = _read_response(body, None, exc.headers)
raise ValueError(f"Bing 返回 HTTP {exc.code}: {text[:200]}") from exc
except urllib.error.URLError as exc:
raise ValueError(f"无法访问 Bing: {exc.reason}") from exc
def bing_search(
query: str,
*,
num_results: int | None = None,
market: str | None = None,
) -> dict[str, Any]:
if not bing_search_tool_enabled():
raise ValueError("Bing 搜索未启用(设置 COMPILOT_BING_SEARCH_ENABLED=true)")
q = str(query or "").strip()
if not q:
raise ValueError("搜索关键词不能为空")
if len(q) > 500:
raise ValueError("搜索关键词过长")
limit = num_results if num_results is not None else bing_search_max_results()
limit = min(max(1, int(limit)), bing_search_max_results())
mkt = (market or "").strip() or "zh-CN"
page_html = _fetch_bing_html(q, count=limit, market=mkt)
items = _parse_results(page_html, limit=limit)
return {
"query": q,
"engine": "bing",
"market": mkt,
"count": len(items),
"results": items,
"note": (
"未解析到结果时,可能是 Bing 页面结构变化、网络受限或触发验证。"
"可缩小关键词后重试。"
if not items
else None
),
}