"""密码哈希与 JWT。"""
from __future__ import annotations
import secrets
from datetime import datetime, timedelta, timezone
from typing import Any
import bcrypt
import jwt
from app.core.config import get_jwt_secret, jwt_expire_hours
from app.core.paths import JWT_SECRET_FILE, ensure_data_dirs
JWT_ALGORITHM = "HS256"
def load_dev_jwt_secret() -> str:
"""开发环境:从 data/.jwt_secret 读取或生成。"""
ensure_data_dirs()
if JWT_SECRET_FILE.is_file():
return JWT_SECRET_FILE.read_text(encoding="utf-8").strip()
secret = secrets.token_hex(32)
JWT_SECRET_FILE.write_text(secret, encoding="utf-8")
return secret
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verify_password(plain: str, hashed: str) -> bool:
try:
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
except ValueError:
return False
def create_access_token(payload: dict[str, Any], hours: int | None = None) -> str:
data = dict(payload)
now = datetime.now(timezone.utc)
expire_hours = hours if hours is not None else jwt_expire_hours()
data["iat"] = now
data["exp"] = now + timedelta(hours=expire_hours)
return jwt.encode(data, get_jwt_secret(), algorithm=JWT_ALGORITHM)
def decode_access_token(token: str) -> dict[str, Any]:
return jwt.decode(token, get_jwt_secret(), algorithms=[JWT_ALGORITHM])
def decode_access_token_optional(token: str) -> dict[str, Any] | None:
try:
return decode_access_token(token)
except Exception:
return None