8bec5f4c创建于 2024年1月29日历史提交
# -*- coding: UTF-8 -*-
import datetime
import os
from urllib.parse import quote

import MySQLdb
import simplejson as json
from django.template import loader
from django.conf import settings
from sql.engines import get_engine
from django.contrib.auth.decorators import permission_required
from django.http import HttpResponse, JsonResponse, FileResponse

from common.utils.extend_json_encoder import ExtendJSONEncoder
from sql.utils.resource_group import user_instances
from .models import Instance


@permission_required("sql.menu_data_dictionary", raise_exception=True)
def table_list(request):
    """数据字典获取表列表"""
    instance_name = request.GET.get("instance_name", "")
    db_name = request.GET.get("db_name", "")
    db_type = request.GET.get("db_type", "")

    if instance_name and db_name:
        try:
            instance = Instance.objects.get(
                instance_name=instance_name, db_type=db_type
            )
            query_engine = get_engine(instance=instance)
            db_name = query_engine.escape_string(db_name)
            data = query_engine.get_group_tables_by_db(db_name=db_name)
            res = {"status": 0, "data": data}
        except Instance.DoesNotExist:
            res = {"status": 1, "msg": "Instance.DoesNotExist"}
        except Exception as e:
            res = {"status": 1, "msg": str(e)}
    else:
        res = {"status": 1, "msg": "非法调用!"}
    return HttpResponse(
        json.dumps(res, cls=ExtendJSONEncoder, bigint_as_string=True),
        content_type="application/json",
    )


@permission_required("sql.menu_data_dictionary", raise_exception=True)
def table_info(request):
    """数据字典获取表信息"""
    instance_name = request.GET.get("instance_name", "")
    db_name = request.GET.get("db_name", "")
    tb_name = request.GET.get("tb_name", "")
    db_type = request.GET.get("db_type", "")

    if instance_name and db_name and tb_name:
        data = {}
        try:
            instance = Instance.objects.get(
                instance_name=instance_name, db_type=db_type
            )
            query_engine = get_engine(instance=instance)
            db_name = query_engine.escape_string(db_name)
            tb_name = query_engine.escape_string(tb_name)
            data["meta_data"] = query_engine.get_table_meta_data(
                db_name=db_name, tb_name=tb_name
            )
            data["desc"] = query_engine.get_table_desc_data(
                db_name=db_name, tb_name=tb_name
            )
            data["index"] = query_engine.get_table_index_data(
                db_name=db_name, tb_name=tb_name
            )

            # mysql数据库可以获取创建表格的SQL语句,mssql暂无找到生成创建表格的SQL语句
            if instance.db_type == "mysql":
                _create_sql = query_engine.query(
                    db_name, "show create table `%s`;" % tb_name
                )
                data["create_sql"] = _create_sql.rows
            res = {"status": 0, "data": data}
        except Instance.DoesNotExist:
            res = {"status": 1, "msg": "Instance.DoesNotExist"}
        except Exception as e:
            res = {"status": 1, "msg": str(e)}
    else:
        res = {"status": 1, "msg": "非法调用!"}
    return HttpResponse(
        json.dumps(res, cls=ExtendJSONEncoder, bigint_as_string=True),
        content_type="application/json",
    )


def get_export_full_path(base_dir: str, instance_name: str, db_name: str) -> str:
    """validate if the instance_name and db_name provided is secure"""
    fullpath = os.path.normpath(
        os.path.join(base_dir, f"{instance_name}_{db_name}.html")
    )
    if not fullpath.startswith(base_dir):
        return ""
    return fullpath


@permission_required("sql.data_dictionary_export", raise_exception=True)
def export(request):
    """导出数据字典"""
    instance_name = request.GET.get("instance_name", "")
    db_name = request.GET.get("db_name", "")

    try:
        instance = user_instances(
            request.user, db_type=["mysql", "mssql", "oracle"]
        ).get(instance_name=instance_name)
        query_engine = get_engine(instance=instance)
    except Instance.DoesNotExist:
        return JsonResponse({"status": 1, "msg": "你所在组未关联该实例!", "data": []})

    # 普通用户仅可以获取指定数据库的字典信息
    if db_name:
        dbs = [query_engine.escape_string(db_name)]
    # 管理员可以导出整个实例的字典信息
    elif request.user.is_superuser:
        dbs = query_engine.get_all_databases().rows
    else:
        return JsonResponse(
            {"status": 1, "msg": "仅管理员可以导出整个实例的字典信息!", "data": []}
        )

    # 获取数据,存入目录
    path = os.path.join(settings.BASE_DIR, "downloads", "dictionary")
    os.makedirs(path, exist_ok=True)
    for db in dbs:
        table_metas = query_engine.get_tables_metas_data(db_name=db)
        context = {
            "db_name": db_name,
            "tables": table_metas,
            "export_time": datetime.datetime.now(),
        }
        data = loader.render_to_string(
            template_name="dictionaryexport.html", context=context, request=request
        )
        fullpath = get_export_full_path(path, instance_name, db)
        if not fullpath:
            return JsonResponse({"status": 1, "msg": "实例名或db名不合法", "data": []})
        with open(fullpath, "w", encoding="utf-8") as fp:
            fp.write(data)
    # 关闭连接
    query_engine.close()
    if db_name:
        fullpath = get_export_full_path(path, instance_name, db)
        if not fullpath:
            return JsonResponse({"status": 1, "msg": "实例名或db名不合法", "data": []})
        response = FileResponse(open(fullpath, "rb"))
        response["Content-Type"] = "application/octet-stream"
        response["Content-Disposition"] = (
            f'attachment;filename="{quote(instance_name)}_{quote(db_name)}.html"'
        )
        return response

    else:
        return JsonResponse(
            {
                "status": 0,
                "msg": f"实例{instance_name}数据字典导出成功,请到downloads目录下载!",
                "data": [],
            }
        )