8bec5f4c创建于 2024年1月29日历史提交
# -*- coding: UTF-8 -*-
import MySQLdb
import simplejson as json
from django.contrib.auth.decorators import permission_required
from django.contrib.auth.password_validation import validate_password
from django.core.exceptions import ValidationError

from django.http import HttpResponse, JsonResponse
from sql.utils.instance_management import (
    SUPPORTED_MANAGEMENT_DB_TYPE,
    get_instanceaccount_unique_value,
    get_instanceaccount_unique_key,
)
from common.utils.extend_json_encoder import ExtendJSONEncoder
from sql.engines import get_engine, ResultSet
from sql.utils.resource_group import user_instances
from .models import Instance, InstanceAccount


@permission_required("sql.menu_instance_account", raise_exception=True)
def users(request):
    """获取实例用户列表"""
    instance_id = request.POST.get("instance_id")
    saved = True if request.POST.get("saved") == "true" else False  # 平台是否保存

    if not instance_id:
        return JsonResponse({"status": 0, "msg": "", "data": []})
    try:
        instance = user_instances(
            request.user, db_type=SUPPORTED_MANAGEMENT_DB_TYPE
        ).get(id=instance_id)
    except Instance.DoesNotExist:
        return JsonResponse({"status": 1, "msg": "你所在组未关联该实例", "data": []})

    # 获取已录入用户
    cnf_users = dict()
    for user in InstanceAccount.objects.filter(instance=instance).values(
        "id", "user", "host", "db_name", "remark"
    ):
        user["saved"] = True
        cnf_users[get_instanceaccount_unique_value(instance.db_type, user)] = user
    # 获取所有用户
    query_engine = get_engine(instance=instance)
    query_result = query_engine.get_instance_users_summary()
    if not query_result.error:
        rows = []
        key = get_instanceaccount_unique_key(db_type=instance.db_type)
        for row in query_result.rows:
            # 合并数据
            if row[key] in cnf_users.keys():
                row = dict(row, **cnf_users[row[key]])
            rows.append(row)
        # 过滤参数
        if saved:
            rows = [row for row in rows if row["saved"]]

        result = {"status": 0, "msg": "ok", "rows": rows}
    else:
        result = {"status": 1, "msg": query_result.error}

    # 关闭连接
    query_engine.close()
    return HttpResponse(
        json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True),
        content_type="application/json",
    )


@permission_required("sql.instance_account_manage", raise_exception=True)
def create(request):
    """创建数据库账号"""
    instance_id = request.POST.get("instance_id", 0)
    db_name = request.POST.get("db_name")
    user = request.POST.get("user")
    host = request.POST.get("host")
    password1 = request.POST.get("password1")
    password2 = request.POST.get("password2")
    remark = request.POST.get("remark", "")

    try:
        instance = user_instances(
            request.user, db_type=SUPPORTED_MANAGEMENT_DB_TYPE
        ).get(id=instance_id)
    except Instance.DoesNotExist:
        return JsonResponse({"status": 1, "msg": "你所在组未关联该实例", "data": []})

    if (
        instance.db_type == "mysql" and not all([user, host, password1, password2])
    ) or (
        instance.db_type == "mongo" and not all([db_name, user, password1, password2])
    ):
        return JsonResponse(
            {"status": 1, "msg": "参数不完整,请确认后提交", "data": []}
        )

    if password1 != password2:
        return JsonResponse({"status": 1, "msg": "两次输入密码不一致", "data": []})

    # TODO 目前使用系统自带验证,后续实现验证器校验
    try:
        validate_password(password1, user=None, password_validators=None)
    except ValidationError as msg:
        return JsonResponse({"status": 1, "msg": f"{msg}", "data": []})

    engine = get_engine(instance=instance)
    exec_result = engine.create_instance_user(
        db_name=db_name, user=user, host=host, password1=password1, remark=remark
    )
    # 关闭连接
    engine.close()
    if exec_result.error:
        return JsonResponse({"status": 1, "msg": exec_result.error})

    # 保存到数据库
    else:
        accounts = [InstanceAccount(**row) for row in exec_result.rows]
        InstanceAccount.objects.bulk_create(accounts)

    return JsonResponse({"status": 0, "msg": "", "data": []})


@permission_required("sql.instance_account_manage", raise_exception=True)
def edit(request):
    """修改、录入数据库账号"""
    instance_id = request.POST.get("instance_id", 0)
    db_name = request.POST.get("db_name", "")
    user = request.POST.get("user")
    host = request.POST.get("host", "")
    password = request.POST.get("password")
    remark = request.POST.get("remark", "")

    try:
        instance = user_instances(
            request.user, db_type=SUPPORTED_MANAGEMENT_DB_TYPE
        ).get(id=instance_id)
    except Instance.DoesNotExist:
        return JsonResponse({"status": 1, "msg": "你所在组未关联该实例", "data": []})

    if (instance.db_type == "mysql" and not all([user, host])) or (
        instance.db_type == "mongo" and not all([db_name, user])
    ):
        return JsonResponse(
            {"status": 1, "msg": "参数不完整,请确认后提交", "data": []}
        )

    # 保存到数据库
    if password:
        InstanceAccount.objects.update_or_create(
            instance=instance,
            user=user,
            host=host,
            db_name=db_name,
            defaults={"password": password, "remark": remark},
        )
    else:
        InstanceAccount.objects.update_or_create(
            instance=instance,
            user=user,
            host=host,
            db_name=db_name,
            defaults={"remark": remark},
        )
    return JsonResponse({"status": 0, "msg": "", "data": []})


@permission_required("sql.instance_account_manage", raise_exception=True)
def grant(request):
    """获取用户权限变更语句,并执行权限变更"""
    instance_id = request.POST.get("instance_id", 0)
    grant_sql = ""

    try:
        instance = user_instances(
            request.user, db_type=SUPPORTED_MANAGEMENT_DB_TYPE
        ).get(id=instance_id)
    except Instance.DoesNotExist:
        return JsonResponse({"status": 1, "msg": "你所在组未关联该实例", "data": []})

    engine = get_engine(instance=instance)
    if instance.db_type == "mysql":
        user_host = request.POST.get("user_host")
        op_type = int(request.POST.get("op_type"))
        priv_type = int(request.POST.get("priv_type"))
        privs = json.loads(request.POST.get("privs"))

        # escape
        user_host = engine.escape_string(user_host)

        # 全局权限
        if priv_type == 0:
            global_privs = privs["global_privs"]
            if not all([global_privs]):
                return JsonResponse(
                    {"status": 1, "msg": "信息不完整,请确认后提交", "data": []}
                )
            global_privs = ["GRANT OPTION" if g == "GRANT" else g for g in global_privs]
            if op_type == 0:
                grant_sql = f"GRANT {','.join(global_privs)} ON *.* TO {user_host};"
            elif op_type == 1:
                grant_sql = f"REVOKE {','.join(global_privs)} ON *.* FROM {user_host};"

        # 库权限
        elif priv_type == 1:
            db_privs = privs["db_privs"]
            db_name = request.POST.getlist("db_name[]")
            if not all([db_privs, db_name]):
                return JsonResponse(
                    {"status": 1, "msg": "信息不完整,请确认后提交", "data": []}
                )
            for db in db_name:
                db_privs = ["GRANT OPTION" if d == "GRANT" else d for d in db_privs]
                if op_type == 0:
                    grant_sql += (
                        f"GRANT {','.join(db_privs)} ON `{db}`.* TO {user_host};"
                    )
                elif op_type == 1:
                    grant_sql += (
                        f"REVOKE {','.join(db_privs)} ON `{db}`.* FROM {user_host};"
                    )
        # 表权限
        elif priv_type == 2:
            tb_privs = privs["tb_privs"]
            db_name = request.POST.get("db_name")
            tb_name = request.POST.getlist("tb_name[]")
            if not all([tb_privs, db_name, tb_name]):
                return JsonResponse(
                    {"status": 1, "msg": "信息不完整,请确认后提交", "data": []}
                )
            for tb in tb_name:
                tb_privs = ["GRANT OPTION" if t == "GRANT" else t for t in tb_privs]
                if op_type == 0:
                    grant_sql += f"GRANT {','.join(tb_privs)} ON `{db_name}`.`{tb}` TO {user_host};"
                elif op_type == 1:
                    grant_sql += f"REVOKE {','.join(tb_privs)} ON `{db_name}`.`{tb}` FROM {user_host};"
        # 列权限
        elif priv_type == 3:
            col_privs = privs["col_privs"]
            db_name = request.POST.get("db_name")
            tb_name = request.POST.get("tb_name")
            col_name = request.POST.getlist("col_name[]")
            if not all([col_privs, db_name, tb_name, col_name]):
                return JsonResponse(
                    {"status": 1, "msg": "信息不完整,请确认后提交", "data": []}
                )
            for priv in col_privs:
                if op_type == 0:
                    grant_sql += f"GRANT {priv}(`{'`,`'.join(col_name)}`) ON `{db_name}`.`{tb_name}` TO {user_host};"
                elif op_type == 1:
                    grant_sql += f"REVOKE {priv}(`{'`,`'.join(col_name)}`) ON `{db_name}`.`{tb_name}` FROM {user_host};"
        # 执行变更语句
        exec_result = engine.execute(db_name="mysql", sql=grant_sql)
    elif instance.db_type == "mongo":
        db_name_user = request.POST.get("db_name_user")
        roles = request.POST.getlist("roles[]")
        arr = db_name_user.split(".")
        db_name = arr[0]
        user = arr[1]
        exec_result = ResultSet()
        try:
            conn = engine.get_connection()
            conn[db_name].command("updateUser", user, roles=roles)
        except Exception as e:
            exec_result.error = str(e)

    # 关闭连接
    engine.close()
    if exec_result.error:
        return JsonResponse({"status": 1, "msg": exec_result.error})
    return JsonResponse({"status": 0, "msg": "", "data": grant_sql})


@permission_required("sql.instance_account_manage", raise_exception=True)
def reset_pwd(request):
    """创建数据库账号"""
    instance_id = request.POST.get("instance_id", 0)
    db_name_user = request.POST.get("db_name_user")
    db_name = request.POST.get("db_name", "")
    user_host = request.POST.get("user_host")
    user = request.POST.get("user")
    host = request.POST.get("host", "")
    reset_pwd1 = request.POST.get("reset_pwd1")
    reset_pwd2 = request.POST.get("reset_pwd2")

    try:
        instance = user_instances(
            request.user, db_type=SUPPORTED_MANAGEMENT_DB_TYPE
        ).get(id=instance_id)
    except Instance.DoesNotExist:
        return JsonResponse({"status": 1, "msg": "你所在组未关联该实例", "data": []})

    if (
        instance.db_type == "mysql" and not all([user, host, reset_pwd1, reset_pwd2])
    ) or (
        instance.db_type == "mongo" and not all([db_name, user, reset_pwd1, reset_pwd2])
    ):
        return JsonResponse(
            {"status": 1, "msg": "参数不完整,请确认后提交", "data": []}
        )

    if reset_pwd1 != reset_pwd2:
        return JsonResponse({"status": 1, "msg": "两次输入密码不一致", "data": []})

    # TODO 目前使用系统自带验证,后续实现验证器校验
    try:
        validate_password(reset_pwd1, user=None, password_validators=None)
    except ValidationError as msg:
        return JsonResponse({"status": 1, "msg": f"{msg}", "data": []})

    engine = get_engine(instance=instance)
    exec_result = engine.reset_instance_user_pwd(
        user_host=user_host, db_name_user=db_name_user, reset_pwd=reset_pwd1
    )
    # 关闭连接
    engine.close()
    if exec_result.error:
        result = {"status": 1, "msg": exec_result.error}
        return HttpResponse(json.dumps(result), content_type="application/json")
    # 保存到数据库
    else:
        InstanceAccount.objects.update_or_create(
            instance=instance,
            user=user,
            host=host,
            db_name=db_name,
            defaults={"password": reset_pwd1},
        )

    return JsonResponse({"status": 0, "msg": "", "data": []})


@permission_required("sql.instance_account_manage", raise_exception=True)
def lock(request):
    """锁定/解锁账号"""
    instance_id = request.POST.get("instance_id", 0)
    user_host = request.POST.get("user_host")
    is_locked = request.POST.get("is_locked")
    lock_sql = ""

    if not all([user_host]):
        return JsonResponse(
            {"status": 1, "msg": "参数不完整,请确认后提交", "data": []}
        )

    try:
        instance = user_instances(request.user, db_type=["mysql"]).get(id=instance_id)
    except Instance.DoesNotExist:
        return JsonResponse({"status": 1, "msg": "你所在组未关联该实例", "data": []})

    # escape
    engine = get_engine(instance=instance)
    user_host = engine.escape_string(user_host)

    if is_locked == "N":
        lock_sql = f"ALTER USER {user_host} ACCOUNT LOCK;"
    elif is_locked == "Y":
        lock_sql = f"ALTER USER {user_host} ACCOUNT UNLOCK;"

    exec_result = engine.execute(db_name="mysql", sql=lock_sql)
    if exec_result.error:
        return JsonResponse({"status": 1, "msg": exec_result.error})
    return JsonResponse({"status": 0, "msg": "", "data": []})


@permission_required("sql.instance_account_manage", raise_exception=True)
def delete(request):
    """删除账号"""
    instance_id = request.POST.get("instance_id", 0)
    db_name_user = request.POST.get("db_name_user")
    db_name = request.POST.get("db_name")
    user_host = request.POST.get("user_host")
    user = request.POST.get("user")
    host = request.POST.get("host")

    try:
        instance = user_instances(
            request.user, db_type=SUPPORTED_MANAGEMENT_DB_TYPE
        ).get(id=instance_id)
    except Instance.DoesNotExist:
        return JsonResponse({"status": 1, "msg": "你所在组未关联该实例", "data": []})

    if (instance.db_type == "mysql" and not all([user_host])) or (
        instance.db_type == "mongo" and not all([db_name_user])
    ):
        return JsonResponse(
            {"status": 1, "msg": "参数不完整,请确认后提交", "data": []}
        )

    engine = get_engine(instance=instance)
    exec_result = engine.drop_instance_user(
        user_host=user_host, db_name_user=db_name_user
    )
    # 关闭连接
    engine.close()
    if exec_result.error:
        return JsonResponse({"status": 1, "msg": exec_result.error})
    # 删除数据库对应记录
    else:
        InstanceAccount.objects.filter(
            instance=instance, user=user, host=host, db_name=db_name
        ).delete()

    return JsonResponse({"status": 0, "msg": "", "data": []})