"""用户鉴权"""
import grp
import logging
import pwd
from starlette import status
from starlette.exceptions import HTTPException
from starlette.requests import HTTPConnection
from apps.services.personal_token import PersonalTokenManager
from apps.services.user import UserManager
logger = logging.getLogger(__name__)
def is_admin(username: str) -> bool:
"""
判断用户是否为管理员
管理员条件:
1. 用户id为0 (root用户)
2. 用户在"wheel"组中
:param username: Linux用户名
:return: 如果用户是管理员返回True,否则返回False
"""
try:
user_info = pwd.getpwnam(username)
if user_info.pw_uid == 0:
return True
except KeyError:
logger.warning("系统中未找到用户 '%s'", username)
return False
except OSError:
logger.exception("访问用户 %s 的信息时出错", username)
return False
try:
wheel_group = grp.getgrnam("wheel")
if username in wheel_group.gr_mem:
return True
except KeyError:
logger.warning("系统中未找到用户组 'wheel'")
except OSError:
logger.exception("访问用户组 'wheel' 信息时出错")
return False
async def verify_personal_token(request: HTTPConnection) -> None:
"""
验证Personal Token是否有效;作为第二层鉴权检查
- 如果已经通过verify_session设置了user_id,则跳过
- 如果Bearer token以sk-开头,则作为Personal Token校验
- 合法则设置user_id,不合法则抛出401
:param request: HTTP请求
:return:
"""
if hasattr(request.state, "user_id"):
return
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
logger.warning("Personal Token鉴权失败:缺少或格式错误的Bearer token")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Personal Token 鉴权失败:需要Bearer token",
)
token = auth_header.split(" ", 1)[1]
if token.startswith("sk-"):
user_id = await PersonalTokenManager.get_user_by_personal_token(token)
if user_id is not None:
request.state.personal_token = token
request.state.user_id = user_id
else:
logger.warning("Personal Token鉴权失败:无效的token")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Personal Token 鉴权失败",
)
async def verify_admin(request: HTTPConnection) -> None:
"""验证用户是否为管理员"""
if not hasattr(request.state, "user_id"):
logger.warning("管理员鉴权失败:用户未登录")
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="用户未登录")
user_id = request.state.user_id
user = await UserManager.get_user(user_id)
request.state.user = user
if not user:
logger.warning("管理员鉴权失败:用户不存在,user_id=%s", user_id)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="用户不存在")
if not is_admin(user.userName):
logger.warning("管理员鉴权失败:用户无管理员权限,user_id=%s", user_id)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="用户无权限")