from __future__ import annotations
from typing import Annotated, Any
from fastapi import APIRouter, Depends, HTTPException, Request
from app.core.config import rate_limit_login
from app.core.deps import get_current_user
from app.core.rate_limit import limiter
from app.core.security import create_access_token
from app.schemas.auth import (
ChangePasswordRequest,
LoginRequest,
LoginResponse,
UserPublic,
)
from app.services import user_store
router = APIRouter(prefix="/api/auth", tags=["auth"])
def _client_ip(request: Request) -> str:
forwarded = (request.headers.get("X-Forwarded-For") or "").split(",")[0].strip()
if forwarded:
return forwarded
if request.client:
return request.client.host
return "unknown"
def _public(user: dict[str, Any]) -> UserPublic:
return UserPublic(**user_store.to_public(user))
@router.post("/login", response_model=LoginResponse)
async def login(payload: LoginRequest, request: Request):
limit, window = rate_limit_login()
ip = _client_ip(request)
if not limiter.allow(f"login:{ip}", limit, window):
raise HTTPException(status_code=429, detail="登录尝试过于频繁,请稍后再试")
user = user_store.authenticate(payload.username, payload.password)
if not user:
raise HTTPException(status_code=401, detail="用户名或密码错误")
token = create_access_token(
{
"sub": user["id"],
"role": user["role"],
}
)
return LoginResponse(access_token=token, user=_public(user).model_dump())
@router.get("/me", response_model=UserPublic)
async def me(user: Annotated[dict[str, Any], Depends(get_current_user)]):
return _public(user)
@router.post("/me/password")
async def change_my_password(
payload: ChangePasswordRequest,
user: Annotated[dict[str, Any], Depends(get_current_user)],
):
try:
user_store.change_password(
user["id"],
payload.current_password,
payload.new_password,
)
return {"ok": True}
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc