/* -------------------------------------------------------------------------
 *  This file is part of the oGRAC project.
 * Copyright (c) 2024 Huawei Technologies Co.,Ltd.
 *
 * oGRAC is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * dcl_alter_parser.c
 *
 *
 * IDENTIFICATION
 * src/ogsql/parser/dcl_alter_parser.c
 *
 * -------------------------------------------------------------------------
 */

#include "dcl_alter_parser.h"
#include "srv_instance.h"
#include "cbo_base.h"
#include "ogsql_privilege.h"
#include "ddl_parser.h"
#include "srv_param_common.h"
#include "srv_params_raft_and_log.h"

#ifdef __cplusplus
extern "C" {
#endif

static status_t sql_parse_alsys_switch(lex_t *lex, knl_alter_sys_def_t *def, word_t *word)
{
    if (lex_fetch(lex, word) != OG_SUCCESS) {
        return OG_ERROR;
    }
    if ((key_wid_t)word->id != KEY_WORD_LOGFILE) {
        OG_SRC_THROW_ERROR_EX(word->text.loc, ERR_SQL_SYNTAX_ERROR, "LOGFILE expected but %s found", W2S(word));
        return OG_ERROR;
    }
    if (lex_expected_end(lex) != OG_SUCCESS) {
        return OG_ERROR;
    }

    def->action = ALTER_SYS_SWITCHLOG;
    return OG_SUCCESS;
}

static status_t sql_parse_match_config(knl_session_t *se, knl_alter_sys_def_t *def, lex_t *lex)
{
    config_item_t *item = NULL;
    if (IS_LOG_MODE(def->param)) {
        text_t name = {
            .str = "_LOG_LEVEL",
            .len = sizeof("_LOG_LEVEL") - 1
        };
        item = cm_get_config_item(GET_CONFIG, &name, OG_TRUE);
    } else {
        text_t name = {
            .str = def->param,
            .len = (uint32)strlen(def->param)
        };
        item = cm_get_config_item(GET_CONFIG, &name, OG_TRUE);
    }

    if (item == NULL) {
        OG_SRC_THROW_ERROR(lex->loc, ERR_INVALID_PARAMETER_NAME, def->param);
        return OG_ERROR;
    }

    def->param_id = item->id;

    if (se->kernel->db.ctrl.core.lrep_mode == LOG_REPLICATION_ON &&
        strcmp(def->param, "ARCH_TIME") == 0) {
        OG_THROW_ERROR(ERR_NOT_COMPATIBLE, "arch time while lrep_mode is LOG_REPLICATION_ON");
        return OG_ERROR;
    }

    /* VERIFY SET VALUE HERE. */
    if ((item->verify) && (item->verify((knl_handle_t)se, (void *)lex, (void *)def) != OG_SUCCESS)) {
        return OG_ERROR;
    }
    return OG_SUCCESS;
}

static status_t sql_bison_normalize_sys_param_value(config_item_t *item, knl_alter_sys_def_t *def)
{
    if (IS_LOG_MODE(def->param)) {
        return sql_bison_normalize_als_log_level(def);
    }

    if (item->datatype == NULL) {
        return OG_SUCCESS;
    }

    if (item->verify == sql_verify_als_onoff) {
        if (cm_str_equal_ins(def->value, "ON")) {
            def->value[0] = (char)OG_TRUE;
            def->value[1] = '\0';
            return OG_SUCCESS;
        }

        if (cm_str_equal_ins(def->value, "OFF")) {
            def->value[0] = (char)OG_FALSE;
            def->value[1] = '\0';
            return OG_SUCCESS;
        }

        OG_THROW_ERROR_EX(ERR_SQL_SYNTAX_ERROR, "invalid parameter value");
        return OG_ERROR;
    }

    /*
     * Native ALTER SYSTEM boolean verification stores OG_FALSE/OG_TRUE in
     * def->value[0]. The bison path keeps textual values first, so normalize
     * them here before the shared notify callbacks consume def->value.
     */
    if (!cm_str_equal_ins(item->datatype, "OG_TYPE_BOOLEAN")) {
        return OG_SUCCESS;
    }

    if (cm_str_equal_ins(def->value, "TRUE")) {
        def->value[0] = (char)OG_TRUE;
        def->value[1] = '\0';
        return OG_SUCCESS;
    }

    if (cm_str_equal_ins(def->value, "FALSE")) {
        def->value[0] = (char)OG_FALSE;
        def->value[1] = '\0';
        return OG_SUCCESS;
    }

    OG_THROW_ERROR_EX(ERR_SQL_SYNTAX_ERROR, "invalid parameter value");
    return OG_ERROR;
}

static bool32 sql_bison_sys_param_needs_verify(config_item_t *item)
{
    return item->verify != NULL && item->datatype != NULL &&
        !cm_str_equal_ins(item->datatype, "OG_TYPE_BOOLEAN") &&
        !cm_str_equal_ins(item->datatype, "OG_TYPE_VARCHAR");
}

static status_t sql_bison_verify_sys_param_value(sql_stmt_t *stmt, config_item_t *item, knl_alter_sys_def_t *def)
{
    lex_t *lex = stmt->session->lex;
    uint32 save_flags = lex->flags;
    status_t status;
    sql_text_t value_text;

    value_text.str = def->value;
    value_text.len = (uint32)strlen(def->value);
    value_text.loc = (source_location_t){ 1, 1 };
    value_text.implicit = OG_FALSE;

    if (lex_push(lex, &value_text) != OG_SUCCESS) {
        return OG_ERROR;
    }

    status = item->verify((knl_handle_t)&stmt->session->knl_session, (void *)lex, (void *)def);
    if (status == OG_SUCCESS) {
        status = lex_expected_end(lex);
    }
    lex->flags = save_flags;
    lex_pop(lex);
    return status;
}

static status_t sql_parse_alsys_modify_replica(lex_t *lex, knl_alter_sys_def_t *def)
{
    word_t word;

    if (lex_expected_fetch((lex_t *)lex, &word) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (word.type == WORD_TYPE_STRING) {
        sql_remove_quota(&word.text.value);
    }

    cm_trim_text((text_t *)&word.text);
    cm_text2str((text_t *)&word.text, def->value, OG_PARAM_BUFFER_SIZE);

    def->action = ALTER_SYS_MODIFY_REPLICA;
    return lex_expected_end(lex);
}

static status_t sql_parse_replication_clause(knl_alter_sys_def_t *sys_def, lex_t *lex)
{
    status_t status;
    word_t word;

    status = lex_expected_fetch(lex, &word);
    OG_RETURN_IFERR(status);

    switch ((key_wid_t)word.id) {
        case KEY_WORD_ON:
            status = sql_parse_alsys_modify_replica(lex, sys_def);
            break;
        case KEY_WORD_OFF:
            sys_def->action = ALTER_SYS_STOP_REPLICA;
            status = lex_expected_end(lex);
            break;
        default:
            OG_SRC_THROW_ERROR_EX(word.text.loc, ERR_SQL_SYNTAX_ERROR, "key word expected but %s found", W2S(&word));
            return OG_ERROR;
    }

    return status;
}

static status_t sql_parse_alsys_set(session_t *session, knl_alter_sys_def_t *sys_def, word_t *word)
{
    status_t status;

    lex_t *lex = session->lex;

    sys_def->is_coord_conn = IS_COORD_CONN(session);
    status = lex_expected_fetch_variant(lex, word);
    OG_RETURN_IFERR(status);

    if ((key_wid_t)word->id == KEY_WORD_REPLICATION) {
        return sql_parse_replication_clause(sys_def, lex);
    }

    sys_def->action = ALTER_SYS_SET_PARAM;
    OG_RETURN_IFERR(cm_text2str((text_t *)&word->text, sys_def->param, OG_NAME_BUFFER_SIZE));
    cm_str_upper(sys_def->param);

    status = lex_expected_fetch_word(lex, "=");
    OG_RETURN_IFERR(status);

    status = sql_parse_match_config(&session->knl_session, sys_def, lex);
    OG_RETURN_IFERR(status);

    OG_RETURN_IFERR(sql_parse_scope_clause(sys_def, lex));

    return lex_expected_end(lex);
}

static status_t sql_parse_alsys_load(sql_stmt_t *stmt, knl_alter_sys_def_t *sys_def, word_t *word)
{
    lex_t *lex = stmt->session->lex;
    text_t user;
    text_t name;

    lex->flags = LEX_WITH_OWNER;

    if (lex_expected_fetch_word(lex, "DICTIONARY") != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (lex_expected_fetch_word(lex, "FOR") != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (lex_expected_fetch_variant(lex, word) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (sql_convert_object_name(stmt, word, &user, NULL, &name) != OG_SUCCESS) {
        return OG_ERROR;
    }

    OG_RETURN_IFERR(cm_text2str(&user, sys_def->param, OG_NAME_BUFFER_SIZE));
    OG_RETURN_IFERR(cm_text2str(&name, sys_def->value, OG_PARAM_BUFFER_SIZE));

    sys_def->action = ALTER_SYS_LOAD_DC;

    return lex_expected_end(lex);
}

static status_t sql_parse_alsys_init(sql_stmt_t *stmt, knl_alter_sys_def_t *sys_def, word_t *word)
{
    lex_t *lex = stmt->session->lex;

    lex->flags = LEX_WITH_OWNER;

    if (lex_expected_fetch_word(lex, "DICTIONARY") != OG_SUCCESS) {
        return OG_ERROR;
    }

    sys_def->action = ALTER_SYS_INIT_ENTRY;

    return lex_expected_end(lex);
}

static status_t sql_parse_alsys_flush(lex_t *lex, knl_alter_sys_def_t *sys_def, word_t *word)
{
    uint32 matched_id;

    if (lex_expected_fetch_1of2(lex, "BUFFER", "SQLPOOL", &matched_id) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (matched_id == 0) {
        sys_def->action = ALTER_SYS_FLUSH_BUFFER;
    } else {
        sys_def->action = ALTER_SYS_FLUSH_SQLPOOL;
    }

    return lex_expected_end(lex);
}

static status_t sql_parse_alsys_recycle(lex_t *lex, knl_alter_sys_def_t *sys_def, word_t *word)
{
    if (lex_expected_fetch_word(lex, "sharedpool") != OG_SUCCESS) {
        return OG_ERROR;
    }

    sys_def->action = ALTER_SYS_RECYCLE_SHAREDPOOL;

    if (lex_try_fetch(lex, "force", &sys_def->force_recycle) != OG_SUCCESS) {
        return OG_ERROR;
    }
    return lex_expected_end(lex);
}

static status_t sql_parse_dump_dest_file(sql_stmt_t *stmt, lex_t *lex, word_t *word, text_t *dest_file)
{
    bool32 result = OG_FALSE;

    if (lex_try_fetch(lex, "TO", &result) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (!result) {
        return OG_SUCCESS;
    }

    if (lex_expected_fetch_string(lex, word) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (sql_copy_text(stmt->context, (text_t *)&word->text, dest_file) != OG_SUCCESS) {
        return OG_ERROR;
    }

    char real_name[OG_MAX_FILE_PATH_LENGH] = { 0x00 };
    if (sql_get_real_path(dest_file, real_name) != OG_SUCCESS) {
        OG_THROW_ERROR_EX(ERR_CAPABILITY_NOT_SUPPORT, "datafile name [%s] fmt", T2S(dest_file));
        return OG_ERROR;
    }
    return OG_SUCCESS;
}

static status_t sql_parse_dump_datafile(sql_stmt_t *stmt, lex_t *lex, knl_alter_sys_def_t *sys_def, word_t *word)
{
    int32 value;

    if (OG_SUCCESS != lex_expected_fetch_int32(lex, &value)) {
        return OG_ERROR;
    }

    if (value < 0) {
        OG_THROW_ERROR_EX(ERR_SQL_SYNTAX_ERROR, "param values should positive");
        return OG_ERROR;
    }

    sys_def->page_id.file = value;

    if (OG_SUCCESS != lex_expected_fetch_word(lex, "PAGE")) {
        OG_SRC_THROW_ERROR_EX(word->loc, ERR_SQL_SYNTAX_ERROR, "page expected but %s found", W2S(word));
        return OG_ERROR;
    }

    if (OG_SUCCESS != lex_expected_fetch_int32(lex, &value)) {
        return OG_ERROR;
    }

    if (value < 0) {
        OG_THROW_ERROR_EX(ERR_SQL_SYNTAX_ERROR, "param values should positive");
        return OG_ERROR;
    }

    sys_def->page_id.page = value;

    if (sql_parse_dump_dest_file(stmt, lex, word, &sys_def->out_file) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return lex_expected_end(lex);
}

static status_t sql_parse_dump_ctrlfile(sql_stmt_t *stmt, lex_t *lex, knl_alter_sys_def_t *sys_def, word_t *word)
{
    sys_def->action = ALTER_SYS_DUMP_CTRLPAGE;
    if (sql_parse_dump_dest_file(stmt, lex, word, &sys_def->out_file) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return lex_expected_end(lex);
}

static status_t sql_parse_alsys_dc_dump(sql_stmt_t *stmt, knl_alter_sys_def_t *sys_def, word_t *word)
{
    status_t status;
    lex_t *lex = stmt->session->lex;
    sys_def->action = ALTER_SYS_DUMP_DC;

    if (OG_SUCCESS != lex_expected_fetch(lex, word)) {
        return OG_ERROR;
    }

    switch (word->id) {
        case KEY_WORD_TABLE:
            sys_def->dump_info.dump_type = DC_DUMP_TABLE;
            stmt->session->lex->flags |= LEX_WITH_OWNER;
            status = lex_expected_fetch_variant(lex, word);
            OG_RETURN_IFERR(status);
            status = sql_convert_object_name(stmt, word, &sys_def->dump_info.user_name,
                                             NULL, &sys_def->dump_info.table_name);
            OG_RETURN_IFERR(status);
            status = sql_check_dump_priv(stmt, sys_def);
            OG_RETURN_IFERR(status);
            break;
        case RES_WORD_USER:
            sys_def->dump_info.dump_type = DC_DUMP_USER;

            status = lex_expected_fetch_variant(lex, word);
            OG_RETURN_IFERR(status);
            status = sql_copy_prefix_tenant(stmt, (text_t *)&word->text, &sys_def->dump_info.user_name, sql_copy_name);
            OG_RETURN_IFERR(status);
            status = sql_check_dump_priv(stmt, sys_def);
            OG_RETURN_IFERR(status);
            break;
        default:
            OG_SRC_THROW_ERROR_EX(word->loc, ERR_SQL_SYNTAX_ERROR, "table/user expected but %s found", W2S(word));
            return OG_ERROR;
    }

    if (sql_parse_dump_dest_file(stmt, lex, word, &sys_def->dump_info.dump_file) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return lex_expected_end(lex);
}

static status_t sql_parse_alsys_dump(sql_stmt_t *stmt, lex_t *lex, knl_alter_sys_def_t *def, word_t *word)
{
    status_t status;

    def->action = ALTER_SYS_DUMP_PAGE;

    if (OG_SUCCESS != lex_expected_fetch(lex, word)) {
        return OG_ERROR;
    }

    switch (word->id) {
        case KEY_WORD_DATAFILE:
            status = sql_parse_dump_datafile(stmt, lex, def, word);
            break;
        case KEY_WORD_CTRLFILE:
            status = sql_parse_dump_ctrlfile(stmt, lex, def, word);
            break;
        case KEY_WORD_CATALOG:
            status = sql_parse_alsys_dc_dump(stmt, def, word);
            break;
        default:
            OG_SRC_THROW_ERROR_EX(word->loc, ERR_SQL_SYNTAX_ERROR, "datafile expected but %s found", W2S(word));
            status = OG_ERROR;
            break;
    }

    return status;
}

static inline status_t sql_parse_int_until(int32 *value, text_t *text, const char *until)
{
    uint32 loop;
    char cval;
    char *beg = text->str;
    if (text->str[0] == '+' || text->str[0] == '-') {
        ++text->str;
        --text->len;
    }
    while (text->str[0] == ' ') {
        ++text->str;
        --text->len;
    }
    if (text->len == 0) {
        return OG_ERROR;
    }
    for (loop = 0; loop < text->len; ++loop) {
        if (until != NULL && text->str[loop] == until[0]) {
            if (loop == 0) {
                return OG_ERROR;
            }
            text->str[loop] = '\0';
            *value = atoi(beg);
            text->str[loop] = ',';
            text->str += (loop + 1);
            text->len -= (loop + 1);
            return OG_SUCCESS;
        }
        if (text->str[loop] > '9' || text->str[loop] < '0') {
            return OG_ERROR;
        }
    }
    if (loop == 0) {
        return OG_ERROR;
    }
    cval = text->str[loop]; // may exceed bound, but it does not matter
    text->str[loop] = '\0';
    *value = atoi(beg);
    text->str[loop] = cval;
    text->str += loop;
    text->len -= loop;
    return OG_SUCCESS;
}

// if the count of splitter in text is num,return 1,else return 0
static inline bool32 sql_parse_num_match_splitter(const text_t *text, int num, char split)
{
    if (text->str == NULL || text->len <= 0) {
        return 0;
    }
    int count = 0;
    for (uint32 i = 0; i < text->len; i++) {
        if (text->str[i] == split) {
            count++;
        }
    }
    return (bool32)(count == num ? 1 : 0);
}

static inline status_t sql_parse_sid_serial(word_t *word, uint32 *sid, uint32 *serial, uint32 *nodeid)
{
    text_t text;
    text_t text1;
    int32 arrint0;
    int32 arrint1;
    int32 arrint2 = 0;
    text.str = word->text.str;
    text.len = word->text.len;
    text1.str = word->text.str;
    text1.len = word->text.len;

    if (OG_SUCCESS != sql_parse_int_until(&arrint0, &text, ",")) {
        OG_SRC_THROW_ERROR_EX(word->loc, ERR_SQL_SYNTAX_ERROR, "invalid session id '%s'", T2S(&word->text));
        return OG_ERROR;
    }

    if (OG_SUCCESS != sql_parse_int_until(&arrint1, &text, ",")) {
        OG_SRC_THROW_ERROR_EX(word->loc, ERR_SQL_SYNTAX_ERROR, "invalid session id '%s'", T2S(&word->text));
        return OG_ERROR;
    }

    if (text.len == 0) {
        // only have sid and serial
        // 2 find two comma
        if (sql_parse_num_match_splitter(&text1, 2, ',')) {
            // The parsing str should have one splitter
            OG_SRC_THROW_ERROR_EX(word->loc, ERR_SQL_SYNTAX_ERROR, "invalid session id '%s'", T2S(&word->text));
            return OG_ERROR;
        }
    } else {
        // have sid and serial and nodeid
        if (!IS_COORDINATOR) {
            OG_SRC_THROW_ERROR_EX(word->loc, ERR_SQL_SYNTAX_ERROR, "not support to kill other session on DN node");
            return OG_ERROR;
        }
        if (text.str[0] != '@') {
            OG_SRC_THROW_ERROR_EX(word->loc, ERR_SQL_SYNTAX_ERROR, "invalid session id '%s'", T2S(&word->text));
            return OG_ERROR;
        }
        ++text.str;
        --text.len;
        if (OG_SUCCESS != sql_parse_int_until(&arrint2, &text, NULL)) {
            OG_SRC_THROW_ERROR_EX(word->loc, ERR_SQL_SYNTAX_ERROR, "invalid session id '%s'", T2S(&word->text));
            return OG_ERROR;
        }
    }

    if (arrint0 < 0 || arrint1 < 0 || arrint2 < 0) {
        OG_SRC_THROW_ERROR_EX(word->loc, ERR_SQL_SYNTAX_ERROR, "invalid session id '%s'", T2S(&word->text));
        return OG_ERROR;
    }

    *sid = (uint32)arrint0;
    *serial = (uint32)arrint1;
    *nodeid = (uint32)arrint2;

    return OG_SUCCESS;
}

static status_t sql_parse_alsys_kill(sql_stmt_t *stmt, lex_t *lex, knl_alter_sys_def_t *sys_def)
{
    word_t word;
    OG_RETURN_IFERR(lex_expected_fetch_word(lex, "SESSION"));
    OG_RETURN_IFERR(lex_expected_fetch_string(lex, &word));
    OG_RETURN_IFERR(sql_parse_sid_serial(&word, &sys_def->session_id, &sys_def->serial_id, &sys_def->node_id));

    sys_def->action = ALTER_SYS_KILL_SESSION;

    return lex_expected_end(lex);
}

static status_t sql_parse_alsys_reset(sql_stmt_t *stmt, lex_t *lex, knl_alter_sys_def_t *def)
{
    def->action = ALTER_SYS_RESET_STATISTIC;
    if (lex_expected_fetch_word(lex, "statistic") != OG_SUCCESS) {
        return OG_ERROR;
    }
    return lex_expected_end(lex);
}

static status_t sql_parse_alsys_checkpoint(lex_t *lex, knl_alter_sys_def_t *def)
{
    uint32 match_id;

    OG_RETURN_IFERR(lex_try_fetch_1of2(lex, "GLOBAL", "LOCAL", &match_id));

    switch (match_id) {
        case 0:
            def->ckpt_type = CKPT_TYPE_GLOBAL;
            break;
        case 1:
        default:
            def->ckpt_type = CKPT_TYPE_LOCAL;
            break;
    }

    OG_RETURN_IFERR(lex_expected_end(lex));

    def->action = ALTER_SYS_CHECKPOINT;
    return OG_SUCCESS;
}

static status_t sql_parse_alsys_arch_set(session_t *session, knl_alter_sys_def_t *def, word_t *word)
{
    uint32 match_id;
    status_t status;
    lex_t *lex = session->lex;

    def->is_coord_conn = IS_COORD_CONN(session);
    status = lex_expected_fetch_variant(lex, word);
    OG_RETURN_IFERR(status);

    if ((key_wid_t)word->id == KEY_WORD_REPLICATION) {
        return sql_parse_replication_clause(def, lex);
    }

    def->action = ALTER_SYS_SET_PARAM;
    OG_RETURN_IFERR(cm_text2str((text_t *)&word->text, def->param, OG_NAME_BUFFER_SIZE));
    cm_str_upper(def->param);

    status = lex_expected_fetch_word(lex, "=");
    OG_RETURN_IFERR(status);

    status = sql_parse_match_config(&session->knl_session, def, lex);
    OG_RETURN_IFERR(status);

    OG_RETURN_IFERR(sql_parse_scope_clause(def, lex));

    OG_RETURN_IFERR(lex_try_fetch_1of2(lex, "GLOBAL", "LOCAL", &match_id));

    switch (match_id) {
        case 0:
            def->arch_set_type = ARCH_SET_TYPE_GLOBAL;
            break;
        case 1:
        default:
            def->arch_set_type = ARCH_SET_TYPE_LOCAL;
            break;
    }

    OG_RETURN_IFERR(lex_expected_end(lex));

    def->action = ALTER_SYS_ARCHIVE_SET;
    return OG_SUCCESS;
}

static status_t sql_parse_reload_conf(lex_t *lex, knl_alter_sys_def_t *def)
{
    uint32 matched_id;
    if (lex_expected_fetch_1of2(lex, "HBA", "PBL", &matched_id) != OG_SUCCESS) {
        return OG_ERROR;
    }
    if (matched_id == 0) {
        def->action = ALTER_SYS_RELOAD_HBA;
    } else {
        def->action = ALTER_SYS_RELOAD_PBL;
    }
    if (lex_expected_fetch_word(lex, "CONFIG")) {
        return OG_ERROR;
    }
    return lex_expected_end(lex);
}

static status_t sql_parse_refresh_sysdba_privilege(lex_t *lex, knl_alter_sys_def_t *def)
{
    def->action = ALTER_SYS_REFRESH_SYSDBA;
    if (lex_expected_fetch_word(lex, "SYSDBA") != OG_SUCCESS) {
        return OG_ERROR;
    }
    if (lex_expected_fetch_word(lex, "PRIVILEGE")) {
        return OG_ERROR;
    }
    return lex_expected_end(lex);
}

static status_t sql_parse_alsys_modify_lsnr_addr(lex_t *lex, knl_alter_sys_def_t *def, alsys_action_e action)
{
    word_t word;
    knl_alter_sys_def_t *sys_def = (knl_alter_sys_def_t *)def;

    if (lex_expected_fetch((lex_t *)lex, &word) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (word.type == WORD_TYPE_STRING) {
        sql_remove_quota(&word.text.value);
    }

    cm_trim_text((text_t *)&word.text);
    OG_RETURN_IFERR(cm_text2str((text_t *)&word.text, sys_def->value, OG_PARAM_BUFFER_SIZE));

    if (!cm_check_ip_valid(sys_def->value)) {
        OG_SRC_THROW_ERROR(lex->loc, ERR_TCP_INVALID_IPADDRESS, sys_def->value);
        return OG_ERROR;
    }

    def->action = action;
    return lex_expected_end(lex);
}

static status_t sql_parse_alsys_modify_hba_conf(lex_t *lex, knl_alter_sys_def_t *def, alsys_action_e action)
{
    word_t word;
    knl_alter_sys_def_t *sys_def = (knl_alter_sys_def_t *)def;

    OG_RETURN_IFERR(lex_expected_fetch_word(lex, "ENTRY"));
    if (lex_expected_fetch((lex_t *)lex, &word) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (word.type == WORD_TYPE_STRING) {
        sql_remove_quota(&word.text.value);
    }

    if (word.text.len > HBA_MAX_LINE_SIZE) {
        OG_THROW_ERROR(ERR_LINE_SIZE_TOO_LONG, 1);
        return OG_ERROR;
    }

    cm_trim_text((text_t *)&word.text);
    OG_RETURN_IFERR(cm_text2str((text_t *)&word.text, sys_def->hba_node, HBA_MAX_LINE_SIZE));

    // check whether hba config is legal
    OG_RETURN_IFERR(cm_check_hba_entry_legality(sys_def->hba_node));

    def->action = action;
    return lex_expected_end(lex);
}

static status_t sql_parse_alsys_add_param_node(lex_t *lex, knl_alter_sys_def_t *def)
{
    if (lex_expected_fetch_word(lex, "LSNR_ADDR") == OG_SUCCESS) {
        // ADD LSNR_ADDR
        return sql_parse_alsys_modify_lsnr_addr(lex, def, ALTER_SYS_ADD_LSNR_ADDR);
    } else if (lex_expected_fetch_word(lex, "HBA") == OG_SUCCESS) {
        // ADD HBA ENTRY
        cm_reset_error();
        return sql_parse_alsys_modify_hba_conf(lex, def, ALTER_SYS_ADD_HBA_ENTRY);
    } else {
        cm_reset_error();
        OG_SRC_THROW_ERROR_EX(LEX_LOC, ERR_SQL_SYNTAX_ERROR, "LSNR_ADDR or HBA expected");
        return OG_ERROR;
    }
}

static status_t sql_parse_alsys_del_param_node(lex_t *lex, knl_alter_sys_def_t *def)
{
    if (lex_expected_fetch_word(lex, "LSNR_ADDR") == OG_SUCCESS) {
        // DELETE LSNR_ADDR
        return sql_parse_alsys_modify_lsnr_addr(lex, def, ALTER_SYS_DELETE_LSNR_ADDR);
    } else if (lex_expected_fetch_word(lex, "HBA") == OG_SUCCESS) {
        // DELETE HBA ENTRY
        cm_reset_error();
        return sql_parse_alsys_modify_hba_conf(lex, def, ALTER_SYS_DEL_HBA_ENTRY);
    } else {
        cm_reset_error();
        OG_SRC_THROW_ERROR_EX(LEX_LOC, ERR_SQL_SYNTAX_ERROR, "LSNR_ADDR or HBA expected");
        return OG_ERROR;
    }
}

static status_t sql_parse_alsys_debug_mode(session_t *session, knl_alter_sys_def_t *def, word_t *word)
{
    debug_config_item_t *debug_params = NULL;
    debug_config_item_t *item = NULL;
    uint32 count;
    lex_t *lex = session->lex;
    knl_session_t *se = &session->knl_session;

    def->action = ALTER_SYS_DEBUG_MODE;

    if (lex_expected_fetch_word(lex, "MODE") != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (lex_expected_fetch_variant(lex, word) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (cm_text2str((text_t *)&word->text, def->param, OG_NAME_BUFFER_SIZE) != OG_SUCCESS) {
        return OG_ERROR;
    }
    cm_str_upper(def->param);

    if (lex_expected_fetch_word(lex, "=") != OG_SUCCESS) {
        return OG_ERROR;
    }

    srv_get_debug_config_info(&debug_params, &count);

    for (uint32 i = 0; i < count; i++) {
        if (cm_str_equal_ins(debug_params[i].name, def->param)) {
            item = &debug_params[i];
            break;
        }
    }

    if (item == NULL) {
        OG_THROW_ERROR(ERR_INVALID_PARAMETER_NAME, def->param);
        return OG_ERROR;
    }

    if (item->verify((knl_handle_t)se, (void *)lex, (void *)def) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return lex_expected_end(lex);
}

static status_t sql_parse_alsys_stop(lex_t *lex, knl_alter_sys_def_t *def)
{
    if (lex_expected_fetch_word(lex, "BUILD") != OG_SUCCESS) {
        return OG_ERROR;
    }

    def->action = ALTER_SYS_STOP_BUILD;
    return lex_expected_end(lex);
}

static status_t sql_parse_alsys_repair(lex_t *lex, knl_alter_sys_def_t *def)
{
    if (lex_expected_fetch_word(lex, "CATALOG") != OG_SUCCESS) {
        return OG_ERROR;
    }
    def->action = ALTER_SYS_REPAIR_CATALOG;
    return lex_expected_end(lex);
}

static status_t set_log_parameter(const char *name)
{
    char log_name[OG_LOG_PARAM_CNT][OG_MAX_PARAM_LEN] = {
        { "_BLACKBOX_STACK_DEPTH" },
        { "ALARM_LOG_DIR" },
        { "AUDIT_LEVEL" },
        { "AUDIT_TRAIL_MODE" },
        { "LOG_HOME" },
        { "_LOG_BACKUP_FILE_COUNT" },
        { "_LOG_MAX_FILE_SIZE" },
        { "_LOG_LEVEL" },
        { "_LOG_FILE_PERMISSIONS" },
        { "_LOG_PATH_PERMISSIONS" },
        { "SQL_STAGE_THRESHOLD" },
        { "RAFT_LOG_LEVEL" },
        { "SLOWSQL_STATS_ENABLE" }
    };
    for (uint32 i = 0; i < OG_LOG_PARAM_CNT; i++) {
        if (cm_str_equal(name, log_name[i])) {
            return OG_SUCCESS;
        }
    }
    return OG_ERROR;
}

static status_t sql_parse_alter_system(sql_stmt_t *stmt)
{
    word_t word;
    knl_alter_sys_def_t *sys_def = NULL;
    status_t status;

    stmt->context->type = OGSQL_TYPE_ALTER_SYSTEM;
    SQL_SET_IGNORE_PWD(stmt->session);
    SQL_SET_COPY_LOG(stmt->session, OG_TRUE);
    status = sql_alloc_mem(stmt->context, sizeof(knl_alter_sys_def_t), (void **)&sys_def);
    OG_RETURN_IFERR(status);

    status = lex_expected_fetch(stmt->session->lex, &word);
    OG_RETURN_IFERR(status);

    switch ((key_wid_t)word.id) {
        case KEY_WORD_SWITCH:
            status = sql_parse_alsys_switch(stmt->session->lex, sys_def, &word);
            break;

        case KEY_WORD_SET:
            status = sql_parse_alsys_set(stmt->session, sys_def, &word);
            if (((cm_log_param_instance()->audit_param.audit_level & SQL_AUDIT_DCL) == 0) &&
                set_log_parameter(sys_def->param) == OG_SUCCESS) {
                sql_audit_log(stmt->session, status, OG_FALSE, OG_TRUE);
            }
            break;

        case KEY_WORD_LOAD:
            status = sql_parse_alsys_load(stmt, sys_def, &word);
            break;

        case KEY_WORD_INIT:
            status = sql_parse_alsys_init(stmt, sys_def, &word);
            break;

        case KEY_WORD_FLUSH:
            status = sql_parse_alsys_flush(stmt->session->lex, sys_def, &word);
            break;

        case KEY_WORD_RECYCLE:
            status = sql_parse_alsys_recycle(stmt->session->lex, sys_def, &word);
            break;

        case KEY_WORD_DUMP:
            status = sql_parse_alsys_dump(stmt, stmt->session->lex, sys_def, &word);
            break;

        case KEY_WORD_KILL:
            status = sql_parse_alsys_kill(stmt, stmt->session->lex, sys_def);
            break;

        case KEY_WORD_RESET:
            status = sql_parse_alsys_reset(stmt, stmt->session->lex, sys_def);
            break;

        case KEY_WORD_CHECKPOINT:
            status = sql_parse_alsys_checkpoint(stmt->session->lex, sys_def);
            break;

        case KEY_WORD_ARCHIVE_SET:
            status = sql_parse_alsys_arch_set(stmt->session, sys_def, &word);
            break;

        case KEY_WORD_RELOAD:
            status = sql_parse_reload_conf(stmt->session->lex, sys_def);
            break;

        case KEY_WORD_REFRESH:
            status = sql_parse_refresh_sysdba_privilege(stmt->session->lex, sys_def);
            break;
        case KEY_WORD_ADD:
            status = sql_parse_alsys_add_param_node(stmt->session->lex, sys_def);
            break;
        case KEY_WORD_DELETE:
            status = sql_parse_alsys_del_param_node(stmt->session->lex, sys_def);
            break;
        case KEY_WORD_DEBUG:
            status = sql_parse_alsys_debug_mode(stmt->session, sys_def, &word);
            break;
        case KEY_WORD_STOP:
            status = sql_parse_alsys_stop(stmt->session->lex, sys_def);
            break;
        case KEY_WORD_REPAIR:
            status = sql_parse_alsys_repair(stmt->session->lex, sys_def);
            break;
        default:
            OG_SRC_THROW_ERROR_EX(word.text.loc, ERR_SQL_SYNTAX_ERROR, "key word expected but %s found", W2S(&word));
            return OG_ERROR;
    }

    stmt->context->entry = sys_def;

    return status;
}

typedef struct st_altset_item altset_item_t;
typedef status_t (*sql_value_parser)(sql_stmt_t *stmt, lex_t *lex, altset_def_t *setting, const altset_item_t *item);
struct st_altset_item {
    text_t name;
    altset_type_t type;
    uint32 id;
    sql_value_parser parser;
};

static status_t sql_parse_set_commit_wait_logging(sql_stmt_t *stmt, lex_t *lex, altset_def_t *def,
    const altset_item_t *item)
{
    word_t word;
    OG_RETURN_IFERR(lex_expected_fetch_word(lex, "="));
    do {
        OG_RETURN_IFERR(lex_expected_fetch(lex, &word));
        if (word.type == WORD_TYPE_STRING) {
            LEX_REMOVE_WRAP(&word);
        }
        cm_trim_text(&word.text.value);
        if (cm_text_str_equal_ins(&word.text.value, "WAIT")) {
            def->commit.nowait = OG_FALSE;
        } else if (cm_text_str_equal_ins(&word.text.value, "NOWAIT")) {
            def->commit.nowait = OG_TRUE;
        } else {
            break;
        }
        def->set_type = SET_COMMIT;
        def->commit.action = COMMIT_WAIT;
        return lex_expected_end(lex);
    } while (0);

    OG_SRC_THROW_ERROR_EX(word.loc, ERR_SQL_SYNTAX_ERROR, "invalid parameter value");
    return OG_ERROR;
}

static status_t sql_parse_set_commit_mode(sql_stmt_t *stmt, lex_t *lex, altset_def_t *def, const altset_item_t *item)
{
    word_t word;

    if (lex_expected_fetch_word(lex, "=") != OG_SUCCESS) {
        return OG_ERROR;
    }

    do {
        OG_RETURN_IFERR(lex_expected_fetch(lex, &word));
        if (word.type == WORD_TYPE_STRING || word.type == WORD_TYPE_DQ_STRING) {
            LEX_REMOVE_WRAP(&word);
        } else if (!IS_VARIANT(&word)) {
            break;
        }

        cm_trim_text(&word.text.value);
        if (cm_text_str_equal_ins(&word.text.value, "IMMEDIATE")) {
            def->commit.batch = OG_FALSE;
        } else if (cm_text_str_equal_ins(&word.text.value, "BATCH")) {
            def->commit.batch = OG_TRUE;
        } else {
            break;
        }

        def->set_type = SET_COMMIT;
        def->commit.action = COMMIT_LOGGING;
        return lex_expected_end(lex);
    } while (0);

    OG_SRC_THROW_ERROR_EX(word.loc, ERR_SQL_SYNTAX_ERROR, "invalid parameter value");
    return OG_ERROR;
}

static status_t sql_parse_set_lockwait_timeout(sql_stmt_t *stmt, lex_t *lex, altset_def_t *def,
    const altset_item_t *item)
{
    uint32 value;
    if (lex_expected_fetch_word(lex, "=") != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (lex_expected_fetch_uint32(lex, &value) != OG_SUCCESS) {
        return OG_ERROR;
    }

    def->set_type = SET_LOCKWAIT_TIMEOUT;
    def->lock_wait_timeout.lock_wait_timeout = value;
    return lex_expected_end(lex);
}

static status_t sql_parse_set_curr_schema(sql_stmt_t *stmt, lex_t *lex, altset_def_t *def, const altset_item_t *item)
{
    word_t value;
    char buf[OG_NAME_BUFFER_SIZE];
    text_t schema;

    if (lex_expected_fetch_word(lex, "=") != OG_SUCCESS) {
        return OG_ERROR;
    }

    OG_RETURN_IFERR(lex_expected_fetch(lex, &value));
    def->set_type = SET_SCHEMA;
    if (value.type == WORD_TYPE_STRING) {
        sql_remove_quota(&value.text.value);
        if (value.text.len == 0) {
            OG_SRC_THROW_ERROR(value.loc, ERR_EMPTY_STRING_NOT_ALLOWED);
            return OG_ERROR;
        }
    }

    if (cm_text2str(&value.text.value, buf, OG_NAME_BUFFER_SIZE) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (sql_user_prefix_tenant(stmt->session, buf) != OG_SUCCESS) {
        return OG_ERROR;
    }

    (void)cm_str2text(buf, &schema);
    if (sql_copy_name(stmt->context, &schema, &def->curr_schema) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return lex_expected_end(lex);
}

static status_t sql_parse_set_session_timezone(sql_stmt_t *stmt, lex_t *lex, altset_def_t *def,
    const altset_item_t *item)
{
    word_t value;

    if (lex_expected_fetch_word(lex, "=") != OG_SUCCESS) {
        return OG_ERROR;
    }

    OG_RETURN_IFERR(lex_expected_fetch_string(lex, &value));

    def->set_type = SET_SESSION_TIMEZONE;
    if (sql_copy_name(stmt->context, &value.text.value, &def->timezone_offset_name) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return lex_expected_end(lex);
}

static status_t sql_parse_set_nlsparam(sql_stmt_t *stmt, lex_t *lex, altset_def_t *def, const altset_item_t *item)
{
    word_t word;
    if (lex_expected_fetch_word(lex, "=") != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (lex_expected_fetch_string(lex, &word) != OG_SUCCESS) {
        return OG_ERROR;
    }

    def->set_type = SET_NLS_PARAMS;
    def->nls_seting.id = (nlsparam_id_t)item->id;
    cm_trim_text(&word.text.value);

    if (sql_copy_text(stmt->context, &word.text.value, &def->nls_seting.value) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return lex_expected_end(lex);
}

static const altset_item_t *sql_get_altset_nlsitems(void)
{
    static altset_item_t altset_nls_items[NLS__MAX_PARAM_NUM];
    static bool32 initialized = OG_FALSE; /* for only initializing 1 time */

    if (initialized) {
        return altset_nls_items;
    }
    for (uint32 i = 0; i < NLS__MAX_PARAM_NUM; i++) {
        altset_nls_items[i].name = g_nlsparam_items[i].key;
        altset_nls_items[i].type = SET_NLS_PARAMS;
        altset_nls_items[i].id = g_nlsparam_items[i].id;
        altset_nls_items[i].parser = sql_parse_set_nlsparam;
    }
    initialized = OG_TRUE;
    return altset_nls_items;
}

static inline status_t sql_try_parse_altset_item(sql_stmt_t *stmt, lex_t *lex, word_t *word, const altset_item_t *items,
    uint32 num, altset_def_t *def, bool32 *found)
{
    uint32 i;
    for (i = 0; i < num; i++) {
        if (cm_text_equal_ins(&(word->text.value), &(items[i].name))) {
            if (sql_copy_text(stmt->context, &word->text.value, &def->pkey) != OG_SUCCESS) {
                return OG_ERROR;
            }

            *found = OG_TRUE;
            return items[i].parser(stmt, lex, def, &items[i]);
        }
    }

    *found = OG_FALSE;
    return OG_SUCCESS;
}

static status_t sql_parse_set_show_explain_predicate(sql_stmt_t *stmt, lex_t *lex, altset_def_t *def,
    const altset_item_t *item)
{
    uint32 match_id;

    OG_RETURN_IFERR(lex_expected_fetch_word(lex, "="));
    OG_RETURN_IFERR(lex_expected_fetch_1of2((lex_t *)lex, "FALSE", "TRUE", &match_id));

    def->on_off = (match_id == 0) ? OG_FALSE : OG_TRUE;
    def->set_type = SET_SHOW_EXPLAIN_PREDICATE;

    return lex_expected_end(lex);
}

static status_t sql_parse_set_shd_socket_timeout(sql_stmt_t *stmt, lex_t *lex, altset_def_t *def,
    const altset_item_t *item)
{
    uint32 value;
    if (lex_expected_fetch_word(lex, "=") != OG_SUCCESS) {
        return OG_ERROR;
    }
    if (lex_expected_fetch_uint32(lex, &value) != OG_SUCCESS) {
        return OG_ERROR;
    }
    if (value > OG_MAX_TIMEOUT_VALUE) {
        OG_THROW_ERROR(ERR_PARAMETER_OVER_RANGE, "SHD_SOCKET_TIMEOUT", (int64)0, (int64)OG_MAX_TIMEOUT_VALUE);
        return OG_ERROR;
    }
    def->set_type = SET_SHD_SOCKET_TIMEOUT;
    def->shd_socket_timeout = value;
    return lex_expected_end(lex);
}

static status_t sql_parse_set_tenant(sql_stmt_t *stmt, lex_t *lex, altset_def_t *def, const altset_item_t *item)
{
    word_t value;

    if (lex_expected_fetch_word(lex, "=") != OG_SUCCESS) {
        return OG_ERROR;
    }

    OG_RETURN_IFERR(lex_expected_fetch(lex, &value));
    def->set_type = SET_TENANT;

    if (sql_copy_name(stmt->context, &value.text.value, &def->tenant) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return lex_expected_end(lex);
}

static status_t sql_parse_set_outer_join_opt(sql_stmt_t *stmt, lex_t *lex, altset_def_t *def, const altset_item_t *item)
{
    uint32 match_id;

    OG_RETURN_IFERR(lex_expected_fetch_word(lex, "="));
    OG_RETURN_IFERR(lex_expected_fetch_1of2((lex_t *)lex, "OFF", "ON", &match_id));

    def->on_off = (match_id == 0) ? OG_FALSE : OG_TRUE;
    def->set_type = SET_OUTER_JOIN_OPT;

    return lex_expected_end(lex);
}

static status_t sql_parse_set_cbo_index_caching(sql_stmt_t *stmt, lex_t *lex, altset_def_t *def,
    const altset_item_t *item)
{
    uint32 value;
    if (lex_expected_fetch_word(lex, "=") != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (lex_expected_fetch_uint32(lex, &value) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (value > CBO_MAX_INDEX_CACHING) {
        OG_THROW_ERROR(ERR_PARAMETER_TOO_LARGE, "CBO_INDEX_CACHING", (int64)CBO_MAX_INDEX_CACHING);
        return OG_ERROR;
    }

    def->set_type = SET_CBO_INDEX_CACHING;
    def->cbo_index_caching = value;
    return lex_expected_end(lex);
}

static status_t sql_parse_set_cbo_index_cost_adj(sql_stmt_t *stmt, lex_t *lex, altset_def_t *def,
    const altset_item_t *item)
{
    uint32 value;
    if (lex_expected_fetch_word(lex, "=") != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (lex_expected_fetch_uint32(lex, &value) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (value > CBO_MAX_INDEX_COST_ADJ) {
        OG_THROW_ERROR(ERR_PARAMETER_TOO_LARGE, "CBO_INDEX_COST_ADJ", (int64)CBO_MAX_INDEX_COST_ADJ);
        return OG_ERROR;
    } else if (value < CBO_MIN_INDEX_COST_ADJ) {
        OG_THROW_ERROR(ERR_PARAMETER_TOO_SMALL, "CBO_INDEX_COST_ADJ", (int64)CBO_MIN_INDEX_COST_ADJ);
        return OG_ERROR;
    }
    def->set_type = SET_CBO_INDEX_COST_ADJ;
    def->cbo_index_cost_adj = value;
    return lex_expected_end(lex);
}

static status_t sql_parse_set_withas_subquery(sql_stmt_t *stmt, lex_t *lex, altset_def_t *def,
    const altset_item_t *item)
{
    uint32 value;
    const char *match_words[] = { "OPTIMIZER", "MATERIALIZE", "INLINE" };

    if (lex_expected_fetch_word(lex, "=") != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (lex_expected_fetch_1of3(lex, match_words[0], match_words[1], match_words[2], &value) != OG_SUCCESS) {
        return OG_ERROR;
    }

    def->set_type = SET_WITHAS_SUBQUERY;
    def->withas_subquery = value;
    return lex_expected_end(lex);
}

static status_t sql_parse_set_cursor_sharing(sql_stmt_t *stmt, lex_t *lex, altset_def_t *def, const altset_item_t *item)
{
    uint32 match_id;

    OG_RETURN_IFERR(lex_expected_fetch_word(lex, "="));
    OG_RETURN_IFERR(lex_expected_fetch_1of2((lex_t *)lex, "OFF", "ON", &match_id));

    def->on_off = (match_id == 0) ? OG_FALSE : OG_TRUE;
    def->set_type = SET_CURSOR_SHARING;

    return lex_expected_end(lex);
}

static status_t sql_parse_set_plan_display_format(sql_stmt_t *stmt, lex_t *lex, altset_def_t *def,
    const altset_item_t *item)
{
    uint32 value = 0;

    if (lex_expected_fetch_word(lex, "=") != OG_SUCCESS) {
        return OG_ERROR;
    }

    uint32 format_index = OG_INVALID_ID32;
    // 3 is PLAN_DISPLAY_OPTION_COUNT
    bool32 option_flag[3] = { OG_FALSE };
    char str[OG_PARAM_BUFFER_SIZE];

    OG_RETURN_IFERR(sql_get_plan_display_format_info(lex, &format_index, option_flag));

    OG_RETURN_IFERR(sql_normalize_plan_display_format_value(str, format_index, option_flag));

    sql_set_plan_display_format(str, &value);

    def->set_type = SET_PLAN_DISPLAY_FORMAT;
    def->plan_display_format = value;
    return lex_expected_end(lex);
}

static inline status_t sql_parse_altses_set(sql_stmt_t *stmt, lex_t *lex, altset_def_t *def)
{
    static const altset_item_t g_altsession_items[] = {
        { { "commit_wait", 11 }, SET_COMMIT, OG_INVALID_ID32, sql_parse_set_commit_wait_logging },
        { { "commit_wait_logging", 19 }, SET_COMMIT, OG_INVALID_ID32, sql_parse_set_commit_wait_logging },
        { { "commit_logging", 14 }, SET_COMMIT, OG_INVALID_ID32, sql_parse_set_commit_mode },
        { { "commit_mode", 11 }, SET_COMMIT, OG_INVALID_ID32, sql_parse_set_commit_mode },
        { { "lock_wait_timeout", 17 }, SET_LOCKWAIT_TIMEOUT, OG_INVALID_ID32, sql_parse_set_lockwait_timeout },
        { { "current_schema", 14 }, SET_SCHEMA, OG_INVALID_ID32, sql_parse_set_curr_schema },
        { { "time_zone", 9 }, SET_SESSION_TIMEZONE, OG_INVALID_ID32, sql_parse_set_session_timezone },
        { { "_show_explain_predicate", 23 }, SET_SHOW_EXPLAIN_PREDICATE, OG_INVALID_ID32, sql_parse_set_show_explain_predicate },
        { { "shd_socket_timeout", 18 }, SET_SHD_SOCKET_TIMEOUT, OG_INVALID_ID32, sql_parse_set_shd_socket_timeout },
        { { "tenant", 6 }, SET_TENANT, OG_INVALID_ID32, sql_parse_set_tenant },
        { { "_outer_join_optimization", 24 }, SET_OUTER_JOIN_OPT, OG_INVALID_ID32, sql_parse_set_outer_join_opt },
        { { "cbo_index_caching", 17 }, SET_CBO_INDEX_CACHING, OG_INVALID_ID32, sql_parse_set_cbo_index_caching },
        { { "cbo_index_cost_adj", 18 }, SET_CBO_INDEX_COST_ADJ, OG_INVALID_ID32, sql_parse_set_cbo_index_cost_adj },
        { { "_withas_subquery", 16 }, SET_WITHAS_SUBQUERY, OG_INVALID_ID32, sql_parse_set_withas_subquery },
        { { "_cursor_sharing", 15 }, SET_CURSOR_SHARING, OG_INVALID_ID32, sql_parse_set_cursor_sharing },
        { { "plan_display_format", 19 }, SET_PLAN_DISPLAY_FORMAT, OG_INVALID_ID32, sql_parse_set_plan_display_format },
    };

    bool32 found = OG_FALSE;
    status_t status;
    word_t word;

    if (lex_expected_fetch(lex, &word) != OG_SUCCESS) {
        return OG_ERROR;
    }
    if (word.type != WORD_TYPE_VARIANT && (((key_wid_t)word.id) != KEY_WORD_TIMEZONE) &&
        (((key_wid_t)word.id) != KEY_WORD_TENANT)) {
        OG_SRC_THROW_ERROR_EX(lex->loc, ERR_SQL_SYNTAX_ERROR, "missing or invalid parameter");
        return OG_ERROR;
    }

    status =
        sql_try_parse_altset_item(stmt, lex, &word, g_altsession_items, ELEMENT_COUNT(g_altsession_items), def, &found);
    OG_RETURN_IFERR(status);
    if (found) {
        return OG_SUCCESS;
    }

    status = sql_try_parse_altset_item(stmt, lex, &word, sql_get_altset_nlsitems(), NLS__MAX_PARAM_NUM, def, &found);
    OG_RETURN_IFERR(status);
    if (found) {
        return OG_SUCCESS;
    }

    OG_SRC_THROW_ERROR_EX(lex->loc, ERR_SQL_SYNTAX_ERROR, "missing or invalid parameter");
    return OG_ERROR;
}

static const word_record_t g_altsess_opt_set[] = {
    {.id = ABLE_TRIGGERS, .tuple = { 1, { "TRIGGERS" } } },
    {.id = ABLE_INAV_TO, .tuple = { 2, { "INTERACTIVE", "TIMEOUT" } } },
    {.id = ABLE_NOLOGGING, .tuple = { 1, { "NOLOGGING" } } },
    {.id = ABLE_OPTINFO, .tuple = { 1, { "OPTINFO_LOG" } } },
};
#define ALT_SESS_OPT_SIZE ELEMENT_COUNT(g_altsess_opt_set)

static inline status_t sql_parse_altses_able(sql_stmt_t *stmt, lex_t *lex, altable_def_t *def)
{
    uint32 matched_id;

    OG_RETURN_IFERR(lex_try_match_records(lex, g_altsess_opt_set, ALT_SESS_OPT_SIZE, (uint32 *)&matched_id));

    if (matched_id == OG_INVALID_ID32) {
        OG_SRC_THROW_ERROR_EX(lex->loc, ERR_SQL_SYNTAX_ERROR, "missing or invalid parameter");
        return OG_ERROR;
    }
    def->able_type = (altable_type_t)matched_id;
    return lex_expected_end(lex);
}

static status_t sql_parse_alter_session(sql_stmt_t *stmt)
{
    status_t status;
    word_t word;
    lex_t *lex = stmt->session->lex;
    alter_session_def_t *def = NULL;

    if (sql_alloc_mem(stmt->context, sizeof(alter_session_def_t), (void **)&def) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (lex_expected_fetch(stmt->session->lex, &word) != OG_SUCCESS) {
        return OG_ERROR;
    }

    switch ((key_wid_t)word.id) {
        case KEY_WORD_SET:
            def->action = ALTSES_SET;
            status = sql_parse_altses_set(stmt, lex, &def->setting);
            break;

        case KEY_WORD_DISABLE:
            def->action = ALTSES_DISABLE;
            def->setable.enable = OG_FALSE;
            status = sql_parse_altses_able(stmt, lex, &def->setable);
            break;

        case KEY_WORD_ENABLE:
            def->action = ALTSES_ENABLE;
            def->setable.enable = OG_TRUE;
            status = sql_parse_altses_able(stmt, lex, &def->setable);
            break;

        default:
            OG_SRC_THROW_ERROR_EX(word.text.loc, ERR_SQL_SYNTAX_ERROR, "key word expected but %s found", W2S(&word));
            return OG_ERROR;
    }

    stmt->context->type = OGSQL_TYPE_ALTER_SESSION;
    stmt->context->entry = def;

    return status;
}


status_t sql_parse_dcl_alter(sql_stmt_t *stmt)
{
    word_t word;
    status_t status;

    status = lex_fetch(stmt->session->lex, &word);
    OG_RETURN_IFERR(status);

    switch ((uint32)word.id) {
        case KEY_WORD_SYSTEM:
            status = sql_parse_alter_system(stmt);
            break;

        case KEY_WORD_SESSION:
            status = sql_parse_alter_session(stmt);
            break;

        default:
            OG_THROW_ERROR_EX(ERR_SQL_SYNTAX_ERROR, "object type expected but %s found", W2S(&word));
            status = OG_ERROR;
            break;
    }

    return status;
}

status_t sql_bison_verify_sys_param(sql_stmt_t *stmt, knl_alter_sys_def_t *def)
{
    config_item_t *item = NULL;
    knl_session_t *se = &stmt->session->knl_session;

    if (IS_LOG_MODE(def->param)) {
        text_t name = {
            .str = "_LOG_LEVEL",
            .len = sizeof("_LOG_LEVEL") - 1
        };
        item = cm_get_config_item(GET_CONFIG, &name, OG_TRUE);
    } else {
        text_t name = {
            .str = def->param,
            .len = (uint32)strlen(def->param)
        };
        item = cm_get_config_item(GET_CONFIG, &name, OG_TRUE);
    }

    if (item == NULL) {
        OG_THROW_ERROR(ERR_INVALID_PARAMETER_NAME, def->param);
        return OG_ERROR;
    }

    def->param_id = item->id;

    if (se->kernel->db.ctrl.core.lrep_mode == LOG_REPLICATION_ON &&
        strcmp(def->param, "ARCH_TIME") == 0) {
        OG_THROW_ERROR(ERR_NOT_COMPATIBLE, "arch time while lrep_mode is LOG_REPLICATION_ON");
        return OG_ERROR;
    }

    if (sql_bison_sys_param_needs_verify(item)) {
        return sql_bison_verify_sys_param_value(stmt, item, def);
    }

    return sql_bison_normalize_sys_param_value(item, def);
}

status_t sql_parse_altses_set_bison(sql_stmt_t *stmt, altset_def_t *def, const char *key, const char *value,
    source_location_t loc)
{
    text_t key_text;
    text_t value_text;
    uint32 uint32_value;
    char buf[OG_PARAM_BUFFER_SIZE];

    cm_str2text((char *)key, &key_text);
    cm_str2text((char *)value, &value_text);

    if (sql_copy_text(stmt->context, &key_text, &def->pkey) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (cm_text_str_equal_ins(&key_text, "commit_wait") || cm_text_str_equal_ins(&key_text, "commit_wait_logging")) {
        if (cm_text_str_equal_ins(&value_text, "WAIT")) {
            def->commit.nowait = OG_FALSE;
        } else if (cm_text_str_equal_ins(&value_text, "NOWAIT")) {
            def->commit.nowait = OG_TRUE;
        } else {
            OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "invalid parameter value");
            return OG_ERROR;
        }
        def->set_type = SET_COMMIT;
        def->commit.action = COMMIT_WAIT;
        return OG_SUCCESS;
    }

    if (cm_text_str_equal_ins(&key_text, "commit_logging") || cm_text_str_equal_ins(&key_text, "commit_mode")) {
        if (cm_text_str_equal_ins(&value_text, "IMMEDIATE")) {
            def->commit.batch = OG_FALSE;
        } else if (cm_text_str_equal_ins(&value_text, "BATCH")) {
            def->commit.batch = OG_TRUE;
        } else {
            OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "invalid parameter value");
            return OG_ERROR;
        }
        def->set_type = SET_COMMIT;
        def->commit.action = COMMIT_LOGGING;
        return OG_SUCCESS;
    }

    if (cm_text_str_equal_ins(&key_text, "lock_wait_timeout")) {
        if (cm_text2uint32(&value_text, &uint32_value) != OG_SUCCESS) {
            OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "invalid parameter value");
            return OG_ERROR;
        }
        def->set_type = SET_LOCKWAIT_TIMEOUT;
        def->lock_wait_timeout.lock_wait_timeout = uint32_value;
        return OG_SUCCESS;
    }

    if (cm_text_str_equal_ins(&key_text, "current_schema")) {
        text_t schema;
        if (value_text.len == 0) {
            OG_SRC_THROW_ERROR(loc, ERR_EMPTY_STRING_NOT_ALLOWED);
            return OG_ERROR;
        }
        if (cm_text2str(&value_text, buf, OG_NAME_BUFFER_SIZE) != OG_SUCCESS) {
            return OG_ERROR;
        }
        if (sql_user_prefix_tenant(stmt->session, buf) != OG_SUCCESS) {
            return OG_ERROR;
        }
        cm_str2text(buf, &schema);
        def->set_type = SET_SCHEMA;
        return sql_copy_name(stmt->context, &schema, &def->curr_schema);
    }

    if (cm_text_str_equal_ins(&key_text, "time_zone")) {
        def->set_type = SET_SESSION_TIMEZONE;
        return sql_copy_name(stmt->context, &value_text, &def->timezone_offset_name);
    }

    if (cm_text_str_equal_ins(&key_text, "_show_explain_predicate")) {
        if (cm_text_str_equal_ins(&value_text, "FALSE")) {
            def->on_off = OG_FALSE;
        } else if (cm_text_str_equal_ins(&value_text, "TRUE")) {
            def->on_off = OG_TRUE;
        } else {
            OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "invalid parameter value");
            return OG_ERROR;
        }
        def->set_type = SET_SHOW_EXPLAIN_PREDICATE;
        return OG_SUCCESS;
    }

    if (cm_text_str_equal_ins(&key_text, "shd_socket_timeout")) {
        if (cm_text2uint32(&value_text, &uint32_value) != OG_SUCCESS) {
            OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "invalid parameter value");
            return OG_ERROR;
        }
        if (uint32_value > OG_MAX_TIMEOUT_VALUE) {
            OG_THROW_ERROR(ERR_PARAMETER_OVER_RANGE, "SHD_SOCKET_TIMEOUT", (int64)0, (int64)OG_MAX_TIMEOUT_VALUE);
            return OG_ERROR;
        }
        def->set_type = SET_SHD_SOCKET_TIMEOUT;
        def->shd_socket_timeout = uint32_value;
        return OG_SUCCESS;
    }

    if (cm_text_str_equal_ins(&key_text, "tenant")) {
        def->set_type = SET_TENANT;
        return sql_copy_name(stmt->context, &value_text, &def->tenant);
    }

    if (cm_text_str_equal_ins(&key_text, "_outer_join_optimization")) {
        if (cm_text_str_equal_ins(&value_text, "OFF")) {
            def->on_off = OG_FALSE;
        } else if (cm_text_str_equal_ins(&value_text, "ON")) {
            def->on_off = OG_TRUE;
        } else {
            OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "invalid parameter value");
            return OG_ERROR;
        }
        def->set_type = SET_OUTER_JOIN_OPT;
        return OG_SUCCESS;
    }

    if (cm_text_str_equal_ins(&key_text, "cbo_index_caching")) {
        if (cm_text2uint32(&value_text, &uint32_value) != OG_SUCCESS) {
            OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "invalid parameter value");
            return OG_ERROR;
        }
        if (uint32_value > CBO_MAX_INDEX_CACHING) {
            OG_THROW_ERROR(ERR_PARAMETER_TOO_LARGE, "CBO_INDEX_CACHING", (int64)CBO_MAX_INDEX_CACHING);
            return OG_ERROR;
        }
        def->set_type = SET_CBO_INDEX_CACHING;
        def->cbo_index_caching = uint32_value;
        return OG_SUCCESS;
    }

    if (cm_text_str_equal_ins(&key_text, "cbo_index_cost_adj")) {
        if (cm_text2uint32(&value_text, &uint32_value) != OG_SUCCESS) {
            OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "invalid parameter value");
            return OG_ERROR;
        }
        if (uint32_value > CBO_MAX_INDEX_COST_ADJ) {
            OG_THROW_ERROR(ERR_PARAMETER_TOO_LARGE, "CBO_INDEX_COST_ADJ", (int64)CBO_MAX_INDEX_COST_ADJ);
            return OG_ERROR;
        }
        if (uint32_value < CBO_MIN_INDEX_COST_ADJ) {
            OG_THROW_ERROR(ERR_PARAMETER_TOO_SMALL, "CBO_INDEX_COST_ADJ", (int64)CBO_MIN_INDEX_COST_ADJ);
            return OG_ERROR;
        }
        def->set_type = SET_CBO_INDEX_COST_ADJ;
        def->cbo_index_cost_adj = uint32_value;
        return OG_SUCCESS;
    }

    if (cm_text_str_equal_ins(&key_text, "_withas_subquery")) {
        if (cm_text_str_equal_ins(&value_text, "OPTIMIZER")) {
            def->withas_subquery = 0;
        } else if (cm_text_str_equal_ins(&value_text, "MATERIALIZE")) {
            def->withas_subquery = 1;
        } else if (cm_text_str_equal_ins(&value_text, "INLINE")) {
            def->withas_subquery = 2;
        } else {
            OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "invalid parameter value");
            return OG_ERROR;
        }
        def->set_type = SET_WITHAS_SUBQUERY;
        return OG_SUCCESS;
    }

    if (cm_text_str_equal_ins(&key_text, "_cursor_sharing")) {
        if (cm_text_str_equal_ins(&value_text, "OFF")) {
            def->on_off = OG_FALSE;
        } else if (cm_text_str_equal_ins(&value_text, "ON")) {
            def->on_off = OG_TRUE;
        } else {
            OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "invalid parameter value");
            return OG_ERROR;
        }
        def->set_type = SET_CURSOR_SHARING;
        return OG_SUCCESS;
    }

    if (cm_text_str_equal_ins(&key_text, "plan_display_format")) {
        uint32 plan_display_format = 0;
        if (cm_text2str(&value_text, buf, sizeof(buf)) != OG_SUCCESS) {
            return OG_ERROR;
        }
        sql_set_plan_display_format(buf, &plan_display_format);
        def->set_type = SET_PLAN_DISPLAY_FORMAT;
        def->plan_display_format = plan_display_format;
        return OG_SUCCESS;
    }

    for (uint32 i = 0; i < NLS__MAX_PARAM_NUM; i++) {
        if (cm_text_equal_ins(&key_text, &g_nlsparam_items[i].key)) {
            def->set_type = SET_NLS_PARAMS;
            def->nls_seting.id = g_nlsparam_items[i].id;
            return sql_copy_text(stmt->context, &value_text, &def->nls_seting.value);
        }
    }

    OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "missing or invalid parameter");
    return OG_ERROR;
}

status_t sql_parse_sid_serial_bison(text_t *src, source_location_t loc, uint32 *sid, uint32 *serial, uint32 *nodeid)
{
    text_t text = *src;
    text_t text1 = *src;
    int32 arrint0;
    int32 arrint1;
    int32 arrint2 = 0;

    if (OG_SUCCESS != sql_parse_int_until(&arrint0, &text, ",")) {
        OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "invalid session id '%s'", T2S(src));
        return OG_ERROR;
    }

    if (OG_SUCCESS != sql_parse_int_until(&arrint1, &text, ",")) {
        OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "invalid session id '%s'", T2S(src));
        return OG_ERROR;
    }

    if (text.len == 0) {
        // only have sid and serial
        // 2 find two comma
        if (sql_parse_num_match_splitter(&text1, 2, ',')) {
            // The parsing str should have one splitter
            OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "invalid session id '%s'", T2S(src));
            return OG_ERROR;
        }
    } else {
        // have sid and serial and nodeid
        if (!IS_COORDINATOR) {
            OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "not support to kill other session on DN node");
            return OG_ERROR;
        }
        if (text.str[0] != '@') {
            OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "invalid session id '%s'", T2S(src));
            return OG_ERROR;
        }
        ++text.str;
        --text.len;
        if (OG_SUCCESS != sql_parse_int_until(&arrint2, &text, NULL)) {
            OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "invalid session id '%s'", T2S(src));
            return OG_ERROR;
        }
    }

    if (arrint0 < 0 || arrint1 < 0 || arrint2 < 0) {
        OG_SRC_THROW_ERROR_EX(loc, ERR_SQL_SYNTAX_ERROR, "invalid session id '%s'", T2S(src));
        return OG_ERROR;
    }

    *sid = (uint32)arrint0;
    *serial = (uint32)arrint1;
    *nodeid = (uint32)arrint2;

    return OG_SUCCESS;
}

#ifdef __cplusplus
}
#endif