"""用户存储。"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from app.core.config import allow_default_admin_login
from app.core.database import db_conn, row_to_dict
from app.core.security import hash_password, verify_password
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def must_change_password(user: dict[str, Any] | None) -> bool:
if not user:
return False
return bool(user.get("must_change_password"))
def to_public(user: dict[str, Any]) -> dict[str, Any]:
return {
"id": str(user["id"]),
"username": str(user.get("username") or ""),
"email": str(user.get("email") or ""),
"display_name": str(user.get("display_name") or ""),
"role": str(user.get("role") or "user"),
"status": str(user.get("status") or "active"),
"must_change_password": must_change_password(user),
"created_at": str(user.get("created_at") or ""),
"updated_at": str(user.get("updated_at") or ""),
}
def get_user(user_id: str) -> dict[str, Any] | None:
with db_conn() as conn:
row = conn.execute(
"SELECT * FROM users WHERE id = ? AND deleted_at IS NULL", (user_id,)
).fetchone()
u = row_to_dict(row)
if u:
u.pop("password_hash", None)
return u
def authenticate(username: str, password: str) -> dict[str, Any] | None:
uname = username.strip()
with db_conn() as conn:
row = conn.execute(
"""
SELECT * FROM users
WHERE username = ? AND deleted_at IS NULL AND status = 'active'
""",
(uname,),
).fetchone()
if not row:
return None
user = dict(row)
if not verify_password(password, user["password_hash"]):
return None
if (
uname == "admin"
and password == "admin123"
and not allow_default_admin_login()
):
return None
user.pop("password_hash", None)
return user
def change_password(user_id: str, current_password: str, new_password: str) -> None:
with db_conn() as conn:
row = conn.execute(
"""
SELECT password_hash FROM users
WHERE id = ? AND deleted_at IS NULL
""",
(user_id,),
).fetchone()
if not row:
raise ValueError("未找到该用户")
if not verify_password(current_password, row["password_hash"]):
raise ValueError("当前密码不正确")
if len(new_password) < 8:
raise ValueError("新密码至少 8 位")
with db_conn() as conn:
cur = conn.execute(
"""
UPDATE users SET password_hash = ?, must_change_password = 0, updated_at = ?
WHERE id = ? AND deleted_at IS NULL
""",
(hash_password(new_password), _now_iso(), user_id),
)
if cur.rowcount == 0:
raise ValueError("未找到该用户")