# -*- coding: UTF-8 -*-
# https://stackoverflow.com/questions/7942520/relationship-between-catalog-schema-user-and-database-instance
import logging
import traceback
import re
import sqlparse
import MySQLdb
import simplejson as json
import threading
import pandas as pd
from common.config import SysConfig
from common.utils.timer import FuncTimer
from sql.utils.sql_utils import (
    get_syntax_type,
    get_full_sqlitem_list,
    get_exec_sqlitem_list,
)
from . import EngineBase
import cx_Oracle
from .models import ResultSet, ReviewSet, ReviewResult
from sql.utils.data_masking import simple_column_mask

logger = logging.getLogger("default")


class OracleEngine(EngineBase):
    test_query = "SELECT 1 FROM DUAL"

    def __init__(self, instance=None):
        super(OracleEngine, self).__init__(instance=instance)
        if instance:
            self.service_name = instance.service_name
            self.sid = instance.sid

    def get_connection(self, db_name=None):
        if self.conn:
            return self.conn
        if self.sid:
            dsn = cx_Oracle.makedsn(self.host, self.port, self.sid)
            self.conn = cx_Oracle.connect(
                self.user, self.password, dsn=dsn, encoding="UTF-8", nencoding="UTF-8"
            )
        elif self.service_name:
            dsn = cx_Oracle.makedsn(
                self.host, self.port, service_name=self.service_name
            )
            self.conn = cx_Oracle.connect(
                self.user, self.password, dsn=dsn, encoding="UTF-8", nencoding="UTF-8"
            )
        else:
            raise ValueError("sid 和 dsn 均未填写, 请联系管理页补充该实例配置.")
        return self.conn

    name = "Oracle"

    info = "Oracle engine"

    @property
    def auto_backup(self):
        """是否支持备份"""
        return True

    @staticmethod
    def get_backup_connection():
        """备份库连接"""
        archer_config = SysConfig()
        backup_host = archer_config.get("inception_remote_backup_host")
        backup_port = int(archer_config.get("inception_remote_backup_port", 3306))
        backup_user = archer_config.get("inception_remote_backup_user")
        backup_password = archer_config.get("inception_remote_backup_password")
        return MySQLdb.connect(
            host=backup_host,
            port=backup_port,
            user=backup_user,
            passwd=backup_password,
            charset="utf8mb4",
            autocommit=True,
        )

    @property
    def server_version(self):
        conn = self.get_connection()
        version = conn.version
        return tuple([n for n in version.split(".")[:3]])

    def get_all_databases(self):
        """获取数据库列表, 返回resultSet 供上层调用, 底层实际上是获取oracle的schema列表"""
        return self._get_all_schemas()

    def _get_all_databases(self):
        """获取数据库列表, 返回一个ResultSet"""
        sql = "select name from v$database"
        result = self.query(sql=sql)
        db_list = [row[0] for row in result.rows]
        result.rows = db_list
        return result

    def _get_all_instances(self):
        """获取实例列表, 返回一个ResultSet"""
        sql = "select instance_name from v$instance"
        result = self.query(sql=sql)
        instance_list = [row[0] for row in result.rows]
        result.rows = instance_list
        return result

    def _get_all_schemas(self):
        """
        获取模式列表
        :return:
        """
        result = self.query(sql="SELECT username FROM all_users order by username")
        sysschema = (
            "AUD_SYS",
            "ANONYMOUS",
            "APEX_030200",
            "APEX_PUBLIC_USER",
            "APPQOSSYS",
            "BI USERS",
            "CTXSYS",
            "DBSNMP",
            "DIP USERS",
            "EXFSYS",
            "FLOWS_FILES",
            "HR USERS",
            "IX USERS",
            "MDDATA",
            "MDSYS",
            "MGMT_VIEW",
            "OE USERS",
            "OLAPSYS",
            "ORACLE_OCM",
            "ORDDATA",
            "ORDPLUGINS",
            "ORDSYS",
            "OUTLN",
            "OWBSYS",
            "OWBSYS_AUDIT",
            "PM USERS",
            "SCOTT",
            "SH USERS",
            "SI_INFORMTN_SCHEMA",
            "SPATIAL_CSW_ADMIN_USR",
            "SPATIAL_WFS_ADMIN_USR",
            "SYS",
            "SYSMAN",
            "SYSTEM",
            "WMSYS",
            "XDB",
            "XS$NULL",
            "DIP",
            "OJVMSYS",
            "LBACSYS",
            "AUDSYS",
            "DBSFWUSER",
            "DVF",
            "DVSYS",
            "GGSYS",
            "GSMADMIN_INTERNAL",
            "GSMCATUSER",
            "GSMUSER",
            "REMOTE_SCHEDULER_AGENT",
            "SYS$UMF",
            "SYSBACKUP",
            "SYSDG",
            "SYSKM",
            "SYSRAC",
        )
        schema_list = [row[0] for row in result.rows if row[0] not in sysschema]
        result.rows = schema_list
        return result

    def get_all_tables(self, db_name, **kwargs):
        """获取table 列表, 返回一个ResultSet"""
        sql = f"""SELECT table_name 
        FROM all_tables 
        WHERE nvl(tablespace_name, 'no tablespace') NOT IN ('SYSTEM', 'SYSAUX') 
        AND OWNER = :db_name AND IOT_NAME IS NULL 
        AND DURATION IS NULL order by table_name"""
        result = self.query(db_name=db_name, sql=sql, parameters={"db_name": db_name})
        tb_list = [row[0] for row in result.rows if row[0] not in ["test"]]
        result.rows = tb_list
        return result

    def get_group_tables_by_db(self, db_name):
        data = {}
        table_list_sql = f"""SELECT table_name, comments FROM  dba_tab_comments  WHERE owner = :db_name"""
        result = self.query(
            db_name=db_name, sql=table_list_sql, parameters={"db_name": db_name}
        )
        for row in result.rows:
            table_name, table_cmt = row[0], row[1]
            if table_name[0] not in data:
                data[table_name[0]] = list()
            data[table_name[0]].append([table_name, table_cmt])
        return data

    def get_table_meta_data(self, db_name, tb_name, **kwargs):
        """数据字典页面使用:获取表格的元信息,返回一个dict{column_list: [], rows: []}"""
        meta_data_sql = f"""select      tcs.TABLE_NAME, --表名
                                        tcs.COMMENTS, --表注释
                                        tcs.TABLE_TYPE,  --表/试图 table/view
                                        ss.SEGMENT_TYPE,  --段类型 堆表/分区表/IOT表
                                        ts.TABLESPACE_NAME, --表空间
                                        ts.COMPRESSION, --压缩属性
                                        bss.NUM_ROWS, --表中的记录数
                                        bss.BLOCKS, --表中数据所占的数据块数
                                        bss.EMPTY_BLOCKS, --表中的空块数
                                        bss.AVG_SPACE, --数据块中平均的使用空间
                                        bss.CHAIN_CNT, --表中行连接和行迁移的数量
                                        bss.AVG_ROW_LEN, --每条记录的平均长度
                                        bss.LAST_ANALYZED  --上次统计信息搜集的时间
                                    from dba_tab_comments tcs
                                    left join dba_segments ss
                                        on ss.owner = tcs.OWNER
                                        and ss.segment_name = tcs.TABLE_NAME
                                    left join dba_tables ts
                                        on ts.OWNER = tcs.OWNER
                                        and ts.TABLE_NAME = tcs.TABLE_NAME
                                    left join DBA_TAB_STATISTICS bss
                                        on bss.OWNER = tcs.owner
                                        and bss.TABLE_NAME = tcs.table_name
    
                                    WHERE
                                        tcs.OWNER=:db_name
                                        AND tcs.TABLE_NAME=:tb_name"""
        _meta_data = self.query(
            db_name=db_name,
            sql=meta_data_sql,
            parameters={"db_name": db_name, "tb_name": tb_name},
        )
        return {"column_list": _meta_data.column_list, "rows": _meta_data.rows[0]}

    def get_table_desc_data(self, db_name, tb_name, **kwargs):
        """获取表格字段信息"""
        desc_sql = f"""SELECT bcs.COLUMN_NAME "列名",
                            ccs.comments "列注释" ,
                            bcs.data_type || case
                             when bcs.data_precision is not null and nvl(data_scale, 0) > 0 then
                              '(' || bcs.data_precision || ',' || data_scale || ')'
                             when bcs.data_precision is not null and nvl(data_scale, 0) = 0 then
                              '(' || bcs.data_precision || ')'
                             when bcs.data_precision is null and data_scale is not null then
                              '(*,' || data_scale || ')'
                             when bcs.char_length > 0 then
                              '(' || bcs.char_length || case char_used
                                when 'B' then
                                 ' Byte'
                                when 'C' then
                                 ' Char'
                                else
                                 null
                              end || ')'
                            end "字段类型",
                            bcs.DATA_DEFAULT "字段默认值",
                            decode(nullable, 'N', ' NOT NULL') "是否为空",
                            ics.INDEX_NAME "所属索引",
                            acs.constraint_type "约束类型"
                        FROM  dba_tab_columns bcs
                        left  join dba_col_comments ccs
                            on  bcs.OWNER = ccs.owner
                            and  bcs.TABLE_NAME = ccs.table_name
                            and  bcs.COLUMN_NAME = ccs.column_name
                        left  join dba_ind_columns ics
                            on  bcs.OWNER = ics.TABLE_OWNER
                            and  bcs.TABLE_NAME = ics.table_name
                            and  bcs.COLUMN_NAME = ics.column_name
                        left join dba_constraints acs
                            on acs.owner = ics.TABLE_OWNER
                            and acs.table_name = ics.TABLE_NAME
                            and acs.index_name = ics.INDEX_NAME
                        WHERE
                            bcs.OWNER=:db_name
                            AND bcs.TABLE_NAME=:tb_name
                        ORDER BY bcs.COLUMN_ID"""
        _desc_data = self.query(
            db_name=db_name,
            sql=desc_sql,
            parameters={"db_name": db_name, "tb_name": tb_name},
        )
        return {"column_list": _desc_data.column_list, "rows": _desc_data.rows}

    def get_table_index_data(self, db_name, tb_name, **kwargs):
        """获取表格索引信息"""
        index_sql = f""" SELECT ais.INDEX_NAME "索引名称",
                                ais.uniqueness "唯一性",
                                cols.column_names "索引列名",
                                ais.index_type "索引类型",
                                ais.compression "压缩属性",
                                ais.tablespace_name "表空间",
                                ais.status "状态",
                                ais.partitioned "分区",
                                pis.partitioning_type "分区状态",
                                pis.locality "是否为LOCAL索引",
                                pis.alignment "前导列索引"
                            FROM dba_indexes ais
                            left join DBA_PART_INDEXES pis
                                on ais.owner = pis.owner
                                and ais.index_name = pis.index_name
                            left JOIN (SELECT 
                                    ics.index_owner,
                                    ics.index_name,
                                    LISTAGG(ics.column_name, ', ') WITHIN GROUP (ORDER BY ics.column_position) AS column_names
                                FROM 
                                    dba_ind_columns ics
                                GROUP BY 
                                    ics.index_owner, ics.index_name
                                    UNION ALL
                                    select lobs.owner, lobs.index_name, lobs.column_name
                                      from dba_lobs lobs
                                    ) cols
                                ON ais.owner = cols.index_owner
                                AND ais.index_name = cols.index_name
                            WHERE
                                ais.owner = :db_name
                                AND ais.table_name = :tb_name"""
        _index_data = self.query(
            db_name, index_sql, parameters={"db_name": db_name, "tb_name": tb_name}
        )
        return {"column_list": _index_data.column_list, "rows": _index_data.rows}

    def get_tables_metas_data(self, db_name, **kwargs):
        """获取数据库所有表格信息,用作数据字典导出接口"""
        table_metas = []
        sql_cols = f""" SELECT bcs.TABLE_NAME TABLE_NAME,
                                   tcs.COMMENTS TABLE_COMMENTS,
                                   bcs.COLUMN_NAME COLUMN_NAME,
                                   bcs.data_type || case
                                     when bcs.data_precision is not null and nvl(data_scale, 0) > 0 then
                                      '(' || bcs.data_precision || ',' || data_scale || ')'
                                     when bcs.data_precision is not null and nvl(data_scale, 0) = 0 then
                                      '(' || bcs.data_precision || ')'
                                     when bcs.data_precision is null and data_scale is not null then
                                      '(*,' || data_scale || ')'
                                     when bcs.char_length > 0 then
                                      '(' || bcs.char_length || case char_used
                                        when 'B' then
                                         ' Byte'
                                        when 'C' then
                                         ' Char'
                                        else
                                         null
                                      end || ')'
                                   end data_type,
                                   bcs.DATA_DEFAULT,
                                   decode(nullable, 'N', ' NOT NULL') nullable,
                                   t1.index_name,
                                   lcs.comments comments
                              FROM dba_tab_columns bcs
                              left join dba_col_comments lcs
                                on bcs.OWNER = lcs.owner
                               and bcs.TABLE_NAME = lcs.table_name
                               and bcs.COLUMN_NAME = lcs.column_name
                              left join dba_tab_comments tcs
                                on bcs.OWNER = tcs.OWNER
                               and bcs.TABLE_NAME = tcs.TABLE_NAME
                              left join (select acs.OWNER,
                                                acs.TABLE_NAME,
                                                scs.column_name,
                                                acs.index_name
                                           from dba_cons_columns scs
                                           join dba_constraints acs
                                             on acs.constraint_name = scs.constraint_name
                                            and acs.owner = scs.OWNER
                                          where acs.constraint_type = 'P') t1
                                on t1.OWNER = bcs.OWNER
                               AND t1.TABLE_NAME = bcs.TABLE_NAME
                               AND t1.column_name = bcs.COLUMN_NAME
                             WHERE bcs.OWNER = :db_name
                             order by bcs.TABLE_NAME, comments"""
        cols_req = self.query(
            sql=sql_cols, close_conn=False, parameters={"db_name": db_name}
        ).rows

        # 给查询结果定义列名,query_engine.query的游标是0 1 2
        cols_df = pd.DataFrame(
            cols_req,
            columns=[
                "TABLE_NAME",
                "TABLE_COMMENTS",
                "COLUMN_NAME",
                "COLUMN_TYPE",
                "COLUMN_DEFAULT",
                "IS_NULLABLE",
                "COLUMN_KEY",
                "COLUMN_COMMENT",
            ],
        )

        # 获得表名称去重
        col_list = cols_df.drop_duplicates("TABLE_NAME").to_dict("records")
        for cl in col_list:
            _meta = dict()
            engine_keys = [
                {"key": "COLUMN_NAME", "value": "字段名"},
                {"key": "COLUMN_TYPE", "value": "数据类型"},
                {"key": "COLUMN_DEFAULT", "value": "默认值"},
                {"key": "IS_NULLABLE", "value": "允许非空"},
                {"key": "COLUMN_KEY", "value": "是否主键"},
                {"key": "COLUMN_COMMENT", "value": "备注"},
            ]
            _meta["ENGINE_KEYS"] = engine_keys
            _meta["TABLE_INFO"] = {
                "TABLE_NAME": cl["TABLE_NAME"],
                "TABLE_COMMENTS": cl["TABLE_COMMENTS"],
            }
            table_name = cl["TABLE_NAME"]
            # 查询DataFrame中满足表名的记录,并转为list
            _meta["COLUMNS"] = cols_df.query("TABLE_NAME == @table_name").to_dict(
                "records"
            )

            table_metas.append(_meta)
        return table_metas

    def get_all_objects(self, db_name, **kwargs):
        """获取object_name 列表, 返回一个ResultSet"""
        sql = f"""SELECT object_name FROM all_objects WHERE OWNER = :db_name """
        result = self.query(db_name=db_name, sql=sql, parameters={"db_name": db_name})
        tb_list = [row[0] for row in result.rows if row[0] not in ["test"]]
        result.rows = tb_list
        return result

    def get_all_columns_by_tb(self, db_name, tb_name, **kwargs):
        """获取所有字段, 返回一个ResultSet"""
        result = self.describe_table(db_name, tb_name)
        column_list = [row[0] for row in result.rows]
        result.rows = column_list
        return result

    def describe_table(self, db_name, tb_name, **kwargs):
        """return ResultSet"""
        # https://www.thepolyglotdeveloper.com/2015/01/find-tables-oracle-database-column-name/
        sql = f"""SELECT
        a.column_name,
        data_type,
        data_length,
        nullable,
        data_default,
        b.comments
        FROM all_tab_cols a, all_col_comments b
        WHERE a.table_name = b.table_name
        and a.owner = b.OWNER
        and a.COLUMN_NAME = b.COLUMN_NAME
        and a.table_name = :tb_name and a.owner = :db_name order by column_id
        """
        result = self.query(
            db_name=db_name,
            sql=sql,
            parameters={"db_name": db_name, "tb_name": tb_name},
        )
        return result

    def object_name_check(self, db_name=None, object_name=""):
        """获取table 列表, 返回一个ResultSet"""
        if "." in object_name:
            schema_name = object_name.split(".")[0]
            object_name = object_name.split(".")[1]
            if '"' in schema_name:
                schema_name = schema_name.replace('"', "")
                if '"' in object_name:
                    object_name = object_name.replace('"', "")
                else:
                    object_name = object_name.upper()
            else:
                schema_name = schema_name.upper()
                if '"' in object_name:
                    object_name = object_name.replace('"', "")
                else:
                    object_name = object_name.upper()
        else:
            schema_name = db_name
            if '"' in object_name:
                object_name = object_name.replace('"', "")
            else:
                object_name = object_name.upper()
        sql = f""" SELECT object_name FROM all_objects WHERE OWNER = :schema_name and OBJECT_NAME = :object_name """
        result = self.query(
            db_name=db_name,
            sql=sql,
            close_conn=False,
            parameters={"schema_name": schema_name, "object_name": object_name},
        )
        if result.affected_rows > 0:
            return True
        else:
            return False

    @staticmethod
    def get_sql_first_object_name(sql=""):
        """获取sql文本中的object_name"""
        object_name = ""
        # 匹配表、索引、序列
        pattern = r"^(create|alter)\s+(table|index|unique\sindex|sequence)\s"
        groups = re.match(pattern, sql, re.M | re.IGNORECASE)

        if groups:
            object_name = (
                re.match(
                    r"^(create|alter)\s+(table|index|unique\sindex|sequence)\s+(.+?)(\s|\()",
                    sql,
                    re.M | re.IGNORECASE,
                )
                .group(3)
                .strip()
            )
            return object_name

        # 匹配创建或者替换SQL块
        pattern = r"^create\s+(or\s+replace\s+)?(function|view|procedure|trigger|package\sbody|package|type\sbody|type)\s"
        groups = re.match(pattern, sql, re.M | re.IGNORECASE)

        if groups:
            object_name = (
                re.match(
                    r"^create\s+(or\s+replace\s+)?(function|view|procedure|trigger|package\sbody|package|type\sbody|type)\s+(.+?)(\s|\()",
                    sql,
                    re.M | re.IGNORECASE,
                )
                .group(3)
                .strip()
            )
            return object_name
        return object_name

    @staticmethod
    def check_create_index_table(sql="", object_name_list=None, db_name=""):
        schema_name = '"' + db_name + '"'
        object_name_list = object_name_list or set()
        if re.match(r"^create\s+index\s", sql):
            table_name = re.match(
                r"^create\s+index\s+.+\s+on\s(.+?)(\(|\s\()", sql, re.M
            ).group(1)
            if "." not in table_name:
                table_name = f"{schema_name}.{table_name}"
            table_name = table_name.upper()
            if table_name in object_name_list:
                return True
            else:
                return False
        elif re.match(r"^create\s+unique\s+index\s", sql):
            table_name = re.match(
                r"^create\s+unique\s+index\s+.+\s+on\s(.+?)(\(|\s\()", sql, re.M
            ).group(1)
            if "." not in table_name:
                table_name = f"{schema_name}.{table_name}"
            table_name = table_name.upper()
            if table_name in object_name_list:
                return True
            else:
                return False
        else:
            return False

    @staticmethod
    def get_dml_table(sql="", object_name_list=None, db_name=""):
        schema_name = '"' + db_name + '"'
        object_name_list = object_name_list or set()
        if re.match(r"^update", sql):
            table_name = re.match(r"^update\s(.+?)\s", sql, re.M).group(1)
            if "." not in table_name:
                table_name = f"{schema_name}.{table_name}"
            table_name = table_name.upper()
            if table_name in object_name_list:
                return True
            else:
                return False
        elif re.match(r"^delete", sql):
            table_name = re.match(r"^delete\s(.+?)\s", sql, re.M).group(1)
            if "." not in table_name:
                table_name = f"{schema_name}.{table_name}"
            table_name = table_name.upper()
            if table_name in object_name_list:
                return True
            else:
                return False
        elif re.match(r"^insert\s", sql):
            table_name = re.match(
                r"^insert\s+((into)|(all\s+into)|(all\s+when\s(.+?)into))\s+(.+?)(\(|\s)",
                sql,
                re.M,
            ).group(6)
            if "." not in table_name:
                table_name = f"{schema_name}.{table_name}"
            table_name = table_name.upper()
            if table_name in object_name_list:
                return True
            else:
                return False
        else:
            return False

    @staticmethod
    def where_check(sql=""):
        if re.match(r"^update((?!where).)*$|^delete((?!where).)*$", sql):
            return True
        else:
            parsed = sqlparse.parse(sql)[0]
            flattened = list(parsed.flatten())
            n_skip = 0
            flattened = flattened[: len(flattened) - n_skip]
            logical_operators = (
                "AND",
                "OR",
                "NOT",
                "BETWEEN",
                "ORDER BY",
                "GROUP BY",
                "HAVING",
            )
            for t in reversed(flattened):
                if t.is_keyword:
                    return True
            return False

    def explain_check(self, db_name=None, sql="", close_conn=False):
        # 使用explain进行支持的SQL语法审核,连接需不中断,防止数据库不断fork进程的大批量消耗
        result = {"msg": "", "rows": 0}
        try:
            conn = self.get_connection()
            cursor = conn.cursor()
            if db_name:
                conn.current_schema = db_name
            if re.match(r"^explain", sql, re.I):
                sql = sql
            else:
                sql = f"explain plan for {sql}"
            sql = sql.rstrip(";")
            cursor.execute(sql)
            # 获取影响行数
            cursor.execute(
                "select CARDINALITY from (select CARDINALITY from PLAN_TABLE t where id = 0 order by t.timestamp desc) where rownum = 1"
            )
            rows = cursor.fetchone()
            conn.rollback()
            if not rows:
                result["rows"] = 0
            else:
                result["rows"] = rows[0]
        except Exception as e:
            logger.warning(
                f"Oracle 语句执行报错,语句:{sql},错误信息{traceback.format_exc()}"
            )
            result["msg"] = str(e)
        finally:
            if close_conn:
                self.close()
            return result

    def query_check(self, db_name=None, sql=""):
        # 查询语句的检查、注释去除、切分
        result = {"msg": "", "bad_query": False, "filtered_sql": sql, "has_star": False}
        keyword_warning = ""
        star_patter = r"(^|,|\s)\*(\s|\(|$)"
        # 删除注释语句,进行语法判断,执行第一条有效sql
        try:
            sql = sqlparse.format(sql, strip_comments=True)
            sql = sqlparse.split(sql)[0]
            result["filtered_sql"] = re.sub(r";$", "", sql.strip())
            sql_lower = sql.lower()
        except IndexError:
            result["bad_query"] = True
            result["msg"] = "没有有效的SQL语句"
            return result
        if re.match(r"^select|^with|^explain", sql_lower) is None:
            result["bad_query"] = True
            result["msg"] = "不支持语法!"
            return result
        if re.search(star_patter, sql_lower) is not None:
            keyword_warning += "禁止使用 * 关键词\n"
            result["has_star"] = True
        if result.get("bad_query") or result.get("has_star"):
            result["msg"] = keyword_warning
        return result

    def query(
        self,
        db_name=None,
        sql="",
        limit_num=0,
        close_conn=True,
        parameters=None,
        **kwargs,
    ):
        """返回 ResultSet"""
        result_set = ResultSet(full_sql=sql)
        try:
            conn = self.get_connection()
            cursor = conn.cursor()
            if db_name:
                conn.current_schema = db_name
            sql = sql.rstrip(";")
            # 支持oralce查询SQL执行计划语句
            if re.match(r"^explain", sql, re.I):
                cursor.execute(sql)
                # 重置SQL文本,获取SQL执行计划
                sql = f"select PLAN_TABLE_OUTPUT from table(dbms_xplan.display)"
            cursor.execute(sql, parameters or [])
            fields = cursor.description
            if any(x[1] == cx_Oracle.CLOB for x in fields):
                rows = [
                    tuple([(c.read() if type(c) == cx_Oracle.LOB else c) for c in r])
                    for r in cursor
                ]
                if int(limit_num) > 0:
                    rows = rows[0 : int(limit_num)]
            else:
                if int(limit_num) > 0:
                    rows = cursor.fetchmany(int(limit_num))
                else:
                    rows = cursor.fetchall()
            result_set.column_list = [i[0] for i in fields] if fields else []
            result_set.rows = [tuple(x) for x in rows]
            result_set.affected_rows = len(result_set.rows)
        except Exception as e:
            logger.warning(
                f"Oracle 语句执行报错,语句:{sql},错误信息{traceback.format_exc()}"
            )
            result_set.error = str(e)
        finally:
            if close_conn:
                self.close()
        return result_set

    def query_masking(self, db_name=None, sql="", resultset=None):
        """简单字段脱敏规则, 仅对select有效"""
        if re.match(r"^select", sql, re.I):
            filtered_result = simple_column_mask(self.instance, resultset)
            filtered_result.is_masked = True
        else:
            filtered_result = resultset
        return filtered_result

    def execute_check(self, db_name=None, sql="", close_conn=True):
        """
        上线单执行前的检查, 返回Review set
        update by Jan.song 20200302
        使用explain对数据修改预计进行检测
        """
        config = SysConfig()
        check_result = ReviewSet(full_sql=sql)
        # explain支持的语法
        explain_re = r"^merge|^update|^delete|^insert|^create\s+table|^create\s+index|^create\s+unique\s+index"
        # 禁用/高危语句检查
        line = 1
        # 保存SQL中的新建对象
        object_name_list = set()
        critical_ddl_regex = config.get("critical_ddl_regex", "")
        p = re.compile(critical_ddl_regex)
        check_result.syntax_type = 2  # TODO 工单类型 0、其他 1、DDL,2、DML
        sqlitem = None
        try:
            sqlitemList = get_full_sqlitem_list(sql, db_name)
            for sqlitem in sqlitemList:
                sql_lower = sqlitem.statement.lower().rstrip(";")
                sql_nolower = sqlitem.statement.rstrip(";")
                object_name = self.get_sql_first_object_name(sql=sql_lower)
                if "." in object_name:
                    object_name = object_name
                else:
                    object_name = f"""{db_name}.{object_name}"""
                object_name_list.add(object_name)
                # 禁用语句
                if re.match(r"^select|^with|^explain", sql_lower):
                    result = ReviewResult(
                        id=line,
                        errlevel=2,
                        stagestatus="驳回不支持语句",
                        errormessage="仅支持DML和DDL语句,查询语句请使用SQL查询功能!",
                        sql=sqlitem.statement,
                    )
                # 高危语句
                elif critical_ddl_regex and p.match(sql_lower.strip()):
                    result = ReviewResult(
                        id=line,
                        errlevel=2,
                        stagestatus="驳回高危SQL",
                        errormessage="禁止提交匹配"
                        + critical_ddl_regex
                        + "条件的语句!",
                        sql=sqlitem.statement,
                    )
                # 驳回未带where数据修改语句,如确实需做全部删除或更新,显示的带上where 1=1
                elif re.match(
                    r"^update((?!where).)*$|^delete((?!where).)*$", sql_lower
                ):
                    result = ReviewResult(
                        id=line,
                        errlevel=2,
                        stagestatus="驳回未带where数据修改",
                        errormessage="数据修改需带where条件!",
                        sql=sqlitem.statement,
                    )
                # 驳回事务控制,会话控制SQL
                elif re.match(r"^set|^rollback|^exit", sql_lower):
                    result = ReviewResult(
                        id=line,
                        errlevel=2,
                        stagestatus="SQL中不能包含^set|^rollback|^exit",
                        errormessage="SQL中不能包含^set|^rollback|^exit",
                        sql=sqlitem.statement,
                    )

                # 通过explain对SQL做语法语义检查
                elif re.match(explain_re, sql_lower) and sqlitem.stmt_type == "SQL":
                    if self.check_create_index_table(
                        db_name=db_name,
                        sql=sql_lower,
                        object_name_list=object_name_list,
                    ):
                        result = ReviewResult(
                            id=line,
                            errlevel=1,
                            stagestatus="WARNING:新建表的新建索引语句暂无法检测!",
                            errormessage="WARNING:新建表的新建索引语句暂无法检测!",
                            stmt_type=sqlitem.stmt_type,
                            object_owner=sqlitem.object_owner,
                            object_type=sqlitem.object_type,
                            object_name=sqlitem.object_name,
                            sql=sqlitem.statement,
                        )
                    elif len(object_name_list) > 0 and self.get_dml_table(
                        db_name=db_name,
                        sql=sql_lower,
                        object_name_list=object_name_list,
                    ):
                        result = ReviewResult(
                            id=line,
                            errlevel=1,
                            stagestatus="WARNING:新建表的数据修改暂无法检测!",
                            errormessage="WARNING:新建表的数据修改暂无法检测!",
                            stmt_type=sqlitem.stmt_type,
                            object_owner=sqlitem.object_owner,
                            object_type=sqlitem.object_type,
                            object_name=sqlitem.object_name,
                            sql=sqlitem.statement,
                        )
                    else:
                        result_set = self.explain_check(
                            db_name=db_name, sql=sqlitem.statement, close_conn=False
                        )
                        if result_set["msg"]:
                            result = ReviewResult(
                                id=line,
                                errlevel=2,
                                stagestatus="explain语法检查未通过!",
                                errormessage=result_set["msg"],
                                sql=sqlitem.statement,
                            )
                        else:
                            # 对create table\create index\create unique index语法做对象存在性检测
                            if re.match(
                                r"^create\s+table|^create\s+index|^create\s+unique\s+index",
                                sql_lower,
                            ):
                                object_name = self.get_sql_first_object_name(
                                    sql=sql_nolower
                                )
                                # 保存create对象对后续SQL做存在性判断
                                if "." in object_name:
                                    schema_name = object_name.split(".")[0]
                                    object_name = object_name.split(".")[1]
                                    if '"' in schema_name:
                                        schema_name = schema_name
                                        if '"' not in object_name:
                                            object_name = object_name.upper()
                                    else:
                                        schema_name = schema_name.upper()
                                        if '"' not in object_name:
                                            object_name = object_name.upper()
                                else:
                                    schema_name = '"' + db_name + '"'
                                    if '"' not in object_name:
                                        object_name = object_name.upper()

                                object_name = f"""{schema_name}.{object_name}"""
                                if (
                                    self.object_name_check(
                                        db_name=db_name, object_name=object_name
                                    )
                                    or object_name in object_name_list
                                ):
                                    result = ReviewResult(
                                        id=line,
                                        errlevel=2,
                                        stagestatus=f"""{object_name}对象已经存在!""",
                                        errormessage=f"""{object_name}对象已经存在!""",
                                        sql=sqlitem.statement,
                                    )
                                else:
                                    object_name_list.add(object_name)
                                    if (
                                        result_set.get("rows", None)
                                        and result_set["rows"] > 1000
                                    ):
                                        result = ReviewResult(
                                            id=line,
                                            errlevel=1,
                                            stagestatus="影响行数大于1000,请关注",
                                            errormessage="影响行数大于1000,请关注",
                                            sql=sqlitem.statement,
                                            stmt_type=sqlitem.stmt_type,
                                            object_owner=sqlitem.object_owner,
                                            object_type=sqlitem.object_type,
                                            object_name=sqlitem.object_name,
                                            affected_rows=result_set["rows"],
                                            execute_time=0,
                                        )
                                    else:
                                        result = ReviewResult(
                                            id=line,
                                            errlevel=0,
                                            stagestatus="Audit completed",
                                            errormessage="None",
                                            sql=sqlitem.statement,
                                            stmt_type=sqlitem.stmt_type,
                                            object_owner=sqlitem.object_owner,
                                            object_type=sqlitem.object_type,
                                            object_name=sqlitem.object_name,
                                            affected_rows=result_set["rows"],
                                            execute_time=0,
                                        )
                            else:
                                if (
                                    result_set.get("rows", None)
                                    and result_set["rows"] > 1000
                                ):
                                    result = ReviewResult(
                                        id=line,
                                        errlevel=1,
                                        stagestatus="影响行数大于1000,请关注",
                                        errormessage="影响行数大于1000,请关注",
                                        sql=sqlitem.statement,
                                        stmt_type=sqlitem.stmt_type,
                                        object_owner=sqlitem.object_owner,
                                        object_type=sqlitem.object_type,
                                        object_name=sqlitem.object_name,
                                        affected_rows=result_set["rows"],
                                        execute_time=0,
                                    )
                                else:
                                    result = ReviewResult(
                                        id=line,
                                        errlevel=0,
                                        stagestatus="Audit completed",
                                        errormessage="None",
                                        sql=sqlitem.statement,
                                        stmt_type=sqlitem.stmt_type,
                                        object_owner=sqlitem.object_owner,
                                        object_type=sqlitem.object_type,
                                        object_name=sqlitem.object_name,
                                        affected_rows=result_set["rows"],
                                        execute_time=0,
                                    )
                # 其它无法用explain判断的语句
                else:
                    # 对alter table做对象存在性检查
                    if re.match(r"^alter\s+table\s", sql_lower):
                        object_name = self.get_sql_first_object_name(sql=sql_nolower)
                        if "." in object_name:
                            schema_name = object_name.split(".")[0]
                            object_name = object_name.split(".")[1]
                            if '"' in schema_name:
                                schema_name = schema_name
                                if '"' not in object_name:
                                    object_name = object_name.upper()
                            else:
                                schema_name = schema_name.upper()
                                if '"' not in object_name:
                                    object_name = object_name.upper()
                        else:
                            schema_name = '"' + db_name + '"'
                            if '"' not in object_name:
                                object_name = object_name.upper()

                        object_name = f"""{schema_name}.{object_name}"""
                        if (
                            not self.object_name_check(
                                db_name=db_name, object_name=object_name
                            )
                            and object_name not in object_name_list
                        ):
                            result = ReviewResult(
                                id=line,
                                errlevel=2,
                                stagestatus=f"""{object_name}对象不存在!""",
                                errormessage=f"""{object_name}对象不存在!""",
                                sql=sqlitem.statement,
                            )
                        else:
                            result = ReviewResult(
                                id=line,
                                errlevel=1,
                                stagestatus="当前平台,此语法不支持审核!",
                                errormessage="当前平台,此语法不支持审核!",
                                sql=sqlitem.statement,
                                stmt_type=sqlitem.stmt_type,
                                object_owner=sqlitem.object_owner,
                                object_type=sqlitem.object_type,
                                object_name=sqlitem.object_name,
                                affected_rows=0,
                                execute_time=0,
                            )
                    # 对create做对象存在性检查
                    elif re.match(r"^create", sql_lower):
                        object_name = self.get_sql_first_object_name(sql=sql_nolower)
                        if "." in object_name:
                            schema_name = object_name.split(".")[0]
                            object_name = object_name.split(".")[1]
                            if '"' in schema_name:
                                schema_name = schema_name
                                if '"' not in object_name:
                                    object_name = object_name.upper()
                            else:
                                schema_name = schema_name.upper()
                                if '"' not in object_name:
                                    object_name = object_name.upper()
                        else:
                            schema_name = '"' + db_name + '"'
                            if '"' not in object_name:
                                object_name = object_name.upper()

                        object_name = f"""{schema_name}.{object_name}"""
                        if re.match(r"^create\sor\sreplace", sql_lower) and (
                            self.object_name_check(
                                db_name=db_name, object_name=object_name
                            )
                            or object_name in object_name_list
                        ):
                            result = ReviewResult(
                                id=line,
                                errlevel=1,
                                stagestatus=f"""{object_name}对象已经存在,请确认是否替换!""",
                                errormessage=f"""{object_name}对象已经存在,请确认是否替换!""",
                                sql=sqlitem.statement,
                                stmt_type=sqlitem.stmt_type,
                                object_owner=sqlitem.object_owner,
                                object_type=sqlitem.object_type,
                                object_name=sqlitem.object_name,
                                affected_rows=0,
                                execute_time=0,
                            )
                        elif (
                            self.object_name_check(
                                db_name=db_name, object_name=object_name
                            )
                            or object_name in object_name_list
                        ):
                            result = ReviewResult(
                                id=line,
                                errlevel=2,
                                stagestatus=f"""{object_name}对象已经存在!""",
                                errormessage=f"""{object_name}对象已经存在!""",
                                sql=sqlitem.statement,
                            )
                        else:
                            object_name_list.add(object_name)
                            result = ReviewResult(
                                id=line,
                                errlevel=1,
                                stagestatus="当前平台,此语法不支持审核!",
                                errormessage="当前平台,此语法不支持审核!",
                                sql=sqlitem.statement,
                                stmt_type=sqlitem.stmt_type,
                                object_owner=sqlitem.object_owner,
                                object_type=sqlitem.object_type,
                                object_name=sqlitem.object_name,
                                affected_rows=0,
                                execute_time=0,
                            )
                    else:
                        result = ReviewResult(
                            id=line,
                            errlevel=1,
                            stagestatus="当前平台,此语法不支持审核!",
                            errormessage="当前平台,此语法不支持审核!",
                            sql=sqlitem.statement,
                            stmt_type=sqlitem.stmt_type,
                            object_owner=sqlitem.object_owner,
                            object_type=sqlitem.object_type,
                            object_name=sqlitem.object_name,
                            affected_rows=0,
                            execute_time=0,
                        )
                # 判断工单类型
                if get_syntax_type(sql=sqlitem.statement, db_type="oracle") == "DDL":
                    check_result.syntax_type = 1
                check_result.rows += [result]
                line += 1
        except Exception as e:
            logger.warning(
                f"Oracle 语句执行报错,第{line}个SQL:{sqlitem.statement},错误信息{traceback.format_exc()}"
            )
            check_result.error = str(e)
        finally:
            if close_conn:
                self.close()
        # 统计警告和错误数量
        for r in check_result.rows:
            if r.errlevel == 1:
                check_result.warning_count += 1
            if r.errlevel == 2:
                check_result.error_count += 1
        return check_result

    def execute_workflow(self, workflow, close_conn=True):
        """执行上线单,返回Review set
        原来的逻辑是根据 sql_content简单来分割SQL,进而再执行这些SQL
        新的逻辑变更为根据审核结果中记录的sql来执行,
        如果是PLSQL存储过程等对象定义操作,还需检查确认新建对象是否编译通过!
        """
        review_content = workflow.sqlworkflowcontent.review_content
        review_result = json.loads(review_content)
        sqlitemList = get_exec_sqlitem_list(review_result, workflow.db_name)

        sql = workflow.sqlworkflowcontent.sql_content
        execute_result = ReviewSet(full_sql=sql)

        line = 1
        statement = None
        try:
            conn = self.get_connection()
            cursor = conn.cursor()
            conn.current_schema = workflow.db_name
            # 获取执行工单时间,用于备份SQL的日志挖掘起始时间
            cursor.execute(f"alter session set nls_date_format='yyyy-mm-dd hh24:mi:ss'")
            cursor.execute(f"select sysdate from dual")
            rows = cursor.fetchone()
            begin_time = rows[0]
            # 逐条执行切分语句,追加到执行结果中
            for sqlitem in sqlitemList:
                statement = sqlitem.statement
                if sqlitem.stmt_type == "SQL":
                    statement = statement.rstrip(";")
                # 如果是DDL的工单,获取对象的原定义,并保存到sql_rollback.undo_sql
                # 需要授权 grant execute on dbms_metadata to xxxxx
                if workflow.syntax_type == 1:
                    object_name = self.get_sql_first_object_name(statement)
                    back_obj_sql = f"""select dbms_metadata.get_ddl(object_type,object_name,owner)
                    from all_objects where (object_name=upper( '{object_name}' ) or OBJECT_NAME = '{sqlitem.object_name}')
                    and owner='{workflow.db_name}'
                                        """
                    cursor.execute(back_obj_sql)
                    metdata_back_flag = self.metdata_backup(workflow, cursor, statement)

                with FuncTimer() as t:
                    if statement != "":
                        cursor.execute(statement)
                        conn.commit()

                rowcount = cursor.rowcount
                stagestatus = "Execute Successfully"
                if (
                    sqlitem.stmt_type == "PLSQL"
                    and sqlitem.object_name
                    and sqlitem.object_name != "ANONYMOUS"
                    and sqlitem.object_name != ""
                ):
                    query_obj_sql = f"""SELECT OBJECT_NAME, STATUS, TO_CHAR(LAST_DDL_TIME, 'YYYY-MM-DD HH24:MI:SS') FROM ALL_OBJECTS
                                         WHERE OWNER = '{sqlitem.object_owner}'
                                         AND OBJECT_NAME = '{sqlitem.object_name}'
                                        """
                    cursor.execute(query_obj_sql)
                    row = cursor.fetchone()
                    if row:
                        status = row[1]
                        if status and status == "INVALID":
                            stagestatus = (
                                "Compile Failed. Object "
                                + sqlitem.object_owner
                                + "."
                                + sqlitem.object_name
                                + " is invalid."
                            )
                    else:
                        stagestatus = (
                            "Compile Failed. Object "
                            + sqlitem.object_owner
                            + "."
                            + sqlitem.object_name
                            + " doesn't exist."
                        )

                    if stagestatus != "Execute Successfully":
                        raise Exception(stagestatus)

                execute_result.rows.append(
                    ReviewResult(
                        id=line,
                        errlevel=0,
                        stagestatus=stagestatus,
                        errormessage="None",
                        sql=statement,
                        affected_rows=cursor.rowcount,
                        execute_time=t.cost,
                    )
                )
                line += 1
        except Exception as e:
            logger.warning(
                f"Oracle命令执行报错,工单id:{workflow.id},语句:{statement or sql}, 错误信息:{traceback.format_exc()}"
            )
            execute_result.error = str(e)
            # conn.rollback()
            # 追加当前报错语句信息到执行结果中
            execute_result.rows.append(
                ReviewResult(
                    id=line,
                    errlevel=2,
                    stagestatus="Execute Failed",
                    errormessage=f"异常信息:{e}",
                    sql=statement or sql,
                    affected_rows=0,
                    execute_time=0,
                )
            )
            line += 1
            # 报错语句后面的语句标记为审核通过、未执行,追加到执行结果中
            for sqlitem in sqlitemList[line - 1 :]:
                execute_result.rows.append(
                    ReviewResult(
                        id=line,
                        errlevel=0,
                        stagestatus="Audit completed",
                        errormessage=f"前序语句失败, 未执行",
                        sql=sqlitem.statement,
                        affected_rows=0,
                        execute_time=0,
                    )
                )
                line += 1
        finally:
            # 备份
            if workflow.is_backup:
                try:
                    cursor.execute(f"select sysdate from dual")
                    rows = cursor.fetchone()
                    end_time = rows[0]
                    self.backup(
                        workflow,
                        cursor=cursor,
                        begin_time=begin_time,
                        end_time=end_time,
                    )
                except Exception as e:
                    logger.error(
                        f"Oracle工单备份异常,工单id:{workflow.id}, 错误信息:{traceback.format_exc()}"
                    )
            if close_conn:
                self.close()
        return execute_result

    def backup(self, workflow, cursor, begin_time, end_time):
        """
        :param workflow: 工单对象,作为备份记录与工单的关联列
        :param cursor: 执行SQL的当前会话游标
        :param begin_time: 执行SQL开始时间
        :param end_time: 执行SQL结束时间
        :return:
        """
        # add Jan.song 2020402
        # 生成回滚SQL,执行用户需要有grant select any transaction to 权限,需要有grant execute on dbms_logmnr to权限
        # 数据库需开启最小化附加日志alter database add supplemental log data;
        # 需为归档模式;开启附件日志会增加redo日志量,一般不会有多大影响,需评估归档磁盘空间,redo磁盘IO性能
        try:
            # 备份存放数据库和MySQL备份库统一,需新建备份用database和table,table存放备份SQL,记录使用workflow.id关联上线工单
            workflow_id = workflow.id
            conn = self.get_backup_connection()
            backup_cursor = conn.cursor()
            backup_cursor.execute(f"""create database if not exists ora_backup;""")
            backup_cursor.execute(f"use ora_backup;")
            backup_cursor.execute(
                f"""CREATE TABLE if not exists `sql_rollback` (
                                       `id` bigint(20) NOT NULL AUTO_INCREMENT,
                                       `redo_sql` mediumtext,
                                       `undo_sql` mediumtext,
                                       `workflow_id` bigint(20) NOT NULL,
                                        PRIMARY KEY (`id`),
                                        key `idx_sql_rollback_01` (`workflow_id`)
                                     ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;"""
            )
            # 使用logminer抓取回滚SQL
            logmnr_start_sql = f"""begin
                                        dbms_logmnr.start_logmnr(
                                        starttime=>to_date('{begin_time}','yyyy-mm-dd hh24:mi:ss'),
                                        endtime=>to_date('{end_time}','yyyy/mm/dd hh24:mi:ss'),
                                        options=>dbms_logmnr.dict_from_online_catalog + dbms_logmnr.continuous_mine);
                                    end;"""
            undo_sql = f"""select 
                           xmlagg(xmlparse(content sql_redo wellformed)  order by  scn,rs_id,ssn,rownum).getclobval() ,
                           xmlagg(xmlparse(content sql_undo wellformed)  order by  scn,rs_id,ssn,rownum).getclobval() 
                           from v$logmnr_contents
                           where  SEG_OWNER not in ('SYS')
                           and session# = (select sid from v$mystat where rownum = 1)
                           and serial# = (select serial# from v$session s where s.sid = (select sid from v$mystat where rownum = 1 ))  
                           group by  scn,rs_id,ssn  order by scn desc"""
            logmnr_end_sql = f"""begin
                                    dbms_logmnr.end_logmnr;
                                 end;"""
            cursor.execute(logmnr_start_sql)
            cursor.execute(undo_sql)
            rows = cursor.fetchall()
            cursor.execute(logmnr_end_sql)
            if len(rows) > 0:
                for row in rows:
                    redo_sql = f"{row[0]}"
                    redo_sql = redo_sql.replace("'", "\\'")
                    if row[1] is None:
                        undo_sql = f" "
                    else:
                        undo_sql = f"{row[1]}"
                    undo_sql = undo_sql.replace("'", "\\'")
                    # 回滚SQL入库
                    sql = f"""insert into sql_rollback(redo_sql,undo_sql,workflow_id) values('{redo_sql}','{undo_sql}',{workflow_id});"""
                    backup_cursor.execute(sql)
        except Exception as e:
            logger.warning(f"备份失败,错误信息{traceback.format_exc()}")
            return False
        finally:
            # 关闭连接
            if conn:
                conn.close()
        return True

    def metdata_backup(self, workflow, cursor, redo_sql):
        """
        :param workflow: 工单对象,作为备份记录与工单的关联列
        :param cursor: 执行SQL的当前会话游标,保存metadata
        :param redo_sql: 执行的SQL
        :return:
        """
        try:
            # 备份存放数据库和MySQL备份库统一,需新建备份用database和table,table存放备份SQL,记录使用workflow.id关联上线工单
            workflow_id = workflow.id
            conn = self.get_backup_connection()
            backup_cursor = conn.cursor()
            backup_cursor.execute(f"""create database if not exists ora_backup;""")
            backup_cursor.execute(f"use ora_backup;")
            backup_cursor.execute(
                f"""CREATE TABLE if not exists `sql_rollback` (
                                       `id` bigint(20) NOT NULL AUTO_INCREMENT,
                                       `redo_sql` mediumtext,
                                       `undo_sql` mediumtext,
                                       `workflow_id` bigint(20) NOT NULL,
                                        PRIMARY KEY (`id`),
                                        key `idx_sql_rollback_01` (`workflow_id`)
                                     ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;"""
            )
            rows = cursor.fetchall()
            if len(rows) > 0:
                for row in rows:
                    if row[0] is None:
                        undo_sql = f" "
                    else:
                        undo_sql = f"{row[0]}"
                    undo_sql = undo_sql.replace("'", "\\'")
                    # 回滚SQL入库
                    sql = f"""insert into sql_rollback(redo_sql,undo_sql,workflow_id) values('{redo_sql}','{undo_sql}',{workflow_id});"""
                    backup_cursor.execute(sql)
        except Exception as e:
            logger.warning(f"备份失败,错误信息{traceback.format_exc()}")
            return False
        finally:
            # 关闭连接
            if conn:
                conn.close()
        return True

    def get_rollback(self, workflow):
        """
         add by Jan.song 20200402
        获取回滚语句,并且按照执行顺序倒序展示,return ['源语句','回滚语句']
        """
        list_execute_result = json.loads(workflow.sqlworkflowcontent.execute_result)
        # 回滚语句倒序展示
        list_execute_result.reverse()
        list_backup_sql = []
        try:
            # 创建连接
            conn = self.get_backup_connection()
            cur = conn.cursor()
            sql = f"""select redo_sql,undo_sql from sql_rollback where workflow_id = {workflow.id} order by id;"""
            cur.execute(f"use ora_backup;")
            cur.execute(sql)
            list_tables = cur.fetchall()
            for row in list_tables:
                redo_sql = row[0]
                undo_sql = row[1]
                # 拼接成回滚语句列表,['源语句','回滚语句']
                list_backup_sql.append([redo_sql, undo_sql])
        except Exception as e:
            logger.error(f"获取回滚语句报错,异常信息{traceback.format_exc()}")
            raise Exception(e)
        # 关闭连接
        if conn:
            conn.close()
        return list_backup_sql

    def sqltuningadvisor(self, db_name=None, sql="", close_conn=True, **kwargs):
        """
        add by Jan.song 20200421
        使用DBMS_SQLTUNE包做sql tuning支持
        执行用户需要有advior角色
        返回 ResultSet
        """
        result_set = ResultSet(full_sql=sql)
        task_name = "sqlaudit" + f"""{threading.currentThread().ident}"""
        task_begin = 0
        try:
            conn = self.get_connection()
            cursor = conn.cursor()
            sql = sql.rstrip(";")
            # 创建分析任务
            create_task_sql = f"""DECLARE
                                  my_task_name VARCHAR2(30);
                                  my_sqltext  CLOB;
                                  BEGIN
                                  my_sqltext := :sql;
                                  my_task_name := DBMS_SQLTUNE.CREATE_TUNING_TASK(
                                  sql_text    => my_sqltext,
                                  user_name   => :db_name,
                                  scope       => 'COMPREHENSIVE',
                                  time_limit  => 30,
                                  task_name   => :task_name,
                                  description => 'tuning');
                                  DBMS_SQLTUNE.EXECUTE_TUNING_TASK( task_name => :task_name);
                                  END;"""
            task_begin = 1
            cursor.execute(
                create_task_sql,
                {"sql": sql, "db_name": db_name, "task_name": task_name},
            )
            # 获取分析报告
            get_task_sql = (
                f"""select DBMS_SQLTUNE.REPORT_TUNING_TASK(:task_name) from dual"""
            )
            cursor.execute(get_task_sql, {"task_name": task_name})
            fields = cursor.description
            if any(x[1] == cx_Oracle.CLOB for x in fields):
                rows = [
                    tuple([(c.read() if type(c) == cx_Oracle.LOB else c) for c in r])
                    for r in cursor
                ]
            else:
                rows = cursor.fetchall()
            result_set.column_list = [i[0] for i in fields] if fields else []
            result_set.rows = [tuple(x) for x in rows]
            result_set.affected_rows = len(result_set.rows)
        except Exception as e:
            logger.warning(
                f"Oracle 语句执行报错,语句:{sql},错误信息{traceback.format_exc()}"
            )
            result_set.error = str(e)
        finally:
            # 结束分析任务
            if task_begin == 1:
                end_sql = f"""DECLARE
                             begin
                             dbms_sqltune.drop_tuning_task('{task_name}');
                             end;"""
                cursor.execute(end_sql)
            if close_conn:
                self.close()
        return result_set

    def execute(self, db_name=None, sql="", close_conn=True, parameters=None):
        """原生执行语句"""
        result = ResultSet(full_sql=sql)
        conn = self.get_connection(db_name=db_name)
        try:
            cursor = conn.cursor()
            for statement in sqlparse.split(sql):
                statement = statement.rstrip(";")
                cursor.execute(statement, parameters or [])
        except Exception as e:
            logger.warning(
                f"Oracle语句执行报错,语句:{sql},错误信息{traceback.format_exc()}"
            )
            result.error = str(e)
        if close_conn:
            self.close()
        return result

    def processlist(self, command_type, **kwargs):
        """获取会话信息"""
        base_sql = """select 
                       s.sid,
                       s.serial#,
                       s.status,
                       s.username,
                       q.sql_text,
                       q.sql_fulltext,
                       s.machine,
                       s.sql_exec_start
                    from v$process p, v$session s, v$sqlarea q 
                    where p.addr = s.paddr  
                       and s.sql_hash_value = q.hash_value"""
        if not command_type:
            command_type = "Active"
        if command_type == "All":
            sql = base_sql + ";"
        elif command_type == "Active":
            sql = "{} and s.status = 'ACTIVE';".format(base_sql)
        elif command_type == "Others":
            sql = "{} and s.status != 'ACTIVE';".format(base_sql)
        else:
            sql = ""

        return self.query(sql=sql)

    def get_kill_command(self, thread_ids):
        """由传入的sid+serial#列表生成kill命令"""
        # 校验传参,thread_ids格式:[[sid, serial#], [sid, serial#]]
        if [
            k
            for k in [[j for j in i if not isinstance(j, int)] for i in thread_ids]
            if k
        ]:
            return None
        sql = """select 'alter system kill session ' || '''' || s.sid || ',' || s.serial# || '''' || ' immediate' || ';'
                 from v$process p, v$session s, v$sqlarea q
                 where p.addr = s.paddr
                 and s.sql_hash_value = q.hash_value
                 and s.sid || ',' || s.serial# in ({});""".format(
            ",".join(f"'{str(tid[0])},{str(tid[1])}'" for tid in thread_ids)
        )
        all_kill_sql = self.query(sql=sql)
        kill_sql = ""
        for row in all_kill_sql.rows:
            kill_sql = kill_sql + row[0]

        return kill_sql

    def kill_session(self, thread_ids):
        """kill会话"""
        # 校验传参,thread_ids格式:[[sid, serial#], [sid, serial#]]
        if [
            k
            for k in [[j for j in i if not isinstance(j, int)] for i in thread_ids]
            if k
        ]:
            return ResultSet(full_sql="")
        sql = """select 'alter system kill session ' || '''' || s.sid || ',' || s.serial# || '''' || ' immediate' || ';'
                         from v$process p, v$session s, v$sqlarea q
                         where p.addr = s.paddr
                         and s.sql_hash_value = q.hash_value
                         and s.sid || ',' || s.serial# in ({});""".format(
            ",".join(f"'{str(tid[0])},{str(tid[1])}'" for tid in thread_ids)
        )
        all_kill_sql = self.query(sql=sql)
        kill_sql = ""
        for row in all_kill_sql.rows:
            kill_sql = kill_sql + row[0]
        return self.execute(sql=kill_sql)

    def tablespace(self, offset=0, row_count=14):
        """获取表空间信息"""
        row_count = offset + row_count
        sql = """
        select f.* from (
            select rownum rownumber, e.* from (
                select a.tablespace_name,
                d.contents tablespace_type,
                d.status,
                round(a.bytes/1024/1024,2) total_space,
                round(b.bytes/1024/1024,2) used_space,
                round((b.bytes * 100) / a.bytes,2) pct_used
                from sys.sm$ts_avail a, sys.sm$ts_used b, sys.sm$ts_free c, dba_tablespaces d
                where a.tablespace_name = b.tablespace_name
                and a.tablespace_name = c.tablespace_name
                and a.tablespace_name = d.tablespace_name
                order by total_space desc ) e
                where rownum <=:row_count
        ) f where f.rownumber >=:offset;"""
        return self.query(
            sql=sql, parameters={"row_count": row_count, "offset": offset}
        )

    def tablespace_count(self):
        """获取表空间数量"""
        sql = """select count(*) from dba_tablespaces where contents != 'TEMPORARY'"""
        return self.query(sql=sql)

    def lock_info(self):
        """获取锁信息"""
        sql = """
        select c.username,
               b.owner object_owner,
               a.object_id,
               b.object_name,
               a.locked_mode,
               c.sid related_sid,
               c.serial# related_serial#,
               c.machine,
               d.sql_text related_sql,
               d.sql_fulltext related_sql_full,
               c.sql_exec_start related_sql_exec_start
        from v$locked_object a,dba_objects b, v$session c, v$sqlarea d
        where b.object_id = a.object_id
        and a.session_id = c.sid
        and c.sql_hash_value = d.hash_value;"""

        return self.query(sql=sql)

    def close(self):
        if self.conn:
            self.conn.close()
            self.conn = None