"""FastAPI 依赖:JWT 认证。"""
from __future__ import annotations
from typing import Annotated, Any
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from app.core.context import set_current_user
from app.core.security import decode_access_token
from app.services import user_store
_bearer = HTTPBearer(auto_error=False)
async def get_optional_user(
creds: Annotated[HTTPAuthorizationCredentials | None, Depends(_bearer)],
) -> dict[str, Any] | None:
if not creds or not creds.credentials:
set_current_user(None)
return None
try:
payload = decode_access_token(creds.credentials)
user = user_store.get_user(str(payload.get("sub") or ""))
if not user or user.get("status") != "active":
set_current_user(None)
return None
set_current_user(user)
return user
except Exception:
set_current_user(None)
return None
async def get_current_user(
user: Annotated[dict[str, Any] | None, Depends(get_optional_user)],
) -> dict[str, Any]:
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="未登录或令牌无效")
return user
async def require_admin(
user: Annotated[dict[str, Any], Depends(get_current_user)],
) -> dict[str, Any]:
if str(user.get("role")) != "admin":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="需要管理员权限")
return user