/* -------------------------------------------------------------------------
 *  This file is part of the oGRAC project.
 * Copyright (c) 2024 Huawei Technologies Co.,Ltd.
 *
 * oGRAC is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * ogsql_select.c
 *
 *
 * IDENTIFICATION
 * src/ogsql/executor/ogsql_select.c
 *
 * -------------------------------------------------------------------------
 */
#include "ogsql_select.h"
#include "ogsql_mtrl.h"
#include "ogsql_scan.h"
#include "ogsql_aggr.h"
#include "ogsql_sort.h"
#include "ogsql_union.h"
#include "ogsql_proj.h"
#include "ogsql_group.h"
#include "ogsql_sort_group.h"
#include "ogsql_index_group.h"
#include "ogsql_hash_mtrl.h"
#include "ogsql_distinct.h"
#include "ogsql_nl_join.h"
#include "ogsql_limit.h"
#include "ogsql_connect.h"
#include "ogsql_filter.h"
#include "ogsql_minus.h"
#include "ogsql_winsort.h"
#include "ogsql_privilege.h"
#include "../../server/srv_instance.h"
#include "ogsql_concate.h"
#include "ogsql_update.h"
#include "ogsql_group_cube.h"
#include "ogsql_unpivot.h"
#include "ogsql_withas_mtrl.h"
#include "ogsql_connect_mtrl.h"
#include "ogsql_vm_view_mtrl.h"
#include "gdv_context.h"
#include "ogsql_hash_join.h"
#include "ogsql_merge_join.h"

static inline status_t sql_check_node_pending(sql_cursor_t *parent_cursor, uint32 tab, bool32 *pending)
{
    sql_table_cursor_t *table_cur = &parent_cursor->tables[tab];

    if (parent_cursor->table_count > 1 && table_cur->table->plan_id > parent_cursor->last_table) {
        *pending = OG_TRUE;
    }
    return OG_SUCCESS;
}

#ifdef OG_RAC_ING
status_t sql_get_parent_remote_table_id(sql_cursor_t *parent_cursor, uint32 tab_id, uint32 *remote_id)
{
    sql_table_cursor_t *table_cur = NULL;

    // all parent table is distribute_none
    *remote_id = tab_id;
    for (uint32 i = 0; i < parent_cursor->table_count; i++) {
        table_cur = &parent_cursor->tables[i];
        OG_CONTINUE_IFTRUE(table_cur->table == NULL);
        OG_CONTINUE_IFTRUE(table_cur->table->remote_type == distribute_none);
        OG_CONTINUE_IFTRUE(table_cur->table->remote_query == NULL);
        for (uint32 j = 0; j < table_cur->table->remote_query->tables.count; j++) {
            sql_table_t *sql_tab = (sql_table_t *)sql_array_get(&table_cur->table->remote_query->tables, j);
            if (tab_id == sql_tab->id) {
                *remote_id = sql_tab->remote_id;
                return OG_SUCCESS;
            }
        }
    }
    return OG_SUCCESS;
}
#endif

status_t sql_check_sub_select_pending(sql_cursor_t *parent_cursor, sql_select_t *select_context, bool32 *pending)
{
    uint32 table_id;
    parent_ref_t *parent_ref = NULL;

    if (select_context->parent_refs->count == 0) {
        return OG_SUCCESS;
    }

    for (uint32 i = 0; i < select_context->parent_refs->count; i++) {
        parent_ref = (parent_ref_t *)cm_galist_get(select_context->parent_refs, i);
        table_id = parent_ref->tab;
        OG_RETURN_IFERR(sql_check_node_pending(parent_cursor, table_id, pending));
        OG_BREAK_IF_TRUE(*pending);
    }
    return OG_SUCCESS;
}

status_t sql_fetch_cursor(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan_node, bool32 *eof)
{
    status_t status;
    CM_TRACE_BEGIN;
    if (cursor->eof) {
        *eof = OG_TRUE;
        return OG_SUCCESS;
    }
    OG_RETURN_IFERR(sql_stack_safe(stmt));

    switch (plan_node->type) {
        case PLAN_NODE_QUERY:
            status = sql_fetch_query(stmt, cursor, plan_node->query.next, eof);
            break;

        case PLAN_NODE_UNION:
            status = sql_fetch_hash_union(stmt, cursor, plan_node, eof);
            break;

        case PLAN_NODE_UNION_ALL:
            status = sql_fetch_union_all(stmt, cursor, plan_node, eof);
            break;

        case PLAN_NODE_MINUS:
            status = sql_fetch_minus(stmt, cursor, plan_node, eof);
            break;

        case PLAN_NODE_HASH_MINUS:
	        knl_panic(0);
            status = OG_ERROR;
            break;

        case PLAN_NODE_SELECT_SORT:
            status = sql_fetch_sort(stmt, cursor, plan_node, eof);
            break;

        case PLAN_NODE_SELECT_LIMIT:
            status = sql_fetch_limit(stmt, cursor, plan_node, eof);
            break;

        case PLAN_NODE_WITHAS_MTRL:
            status = sql_fetch_withas_mtrl(stmt, cursor, plan_node, eof);
            break;

        case PLAN_NODE_VM_VIEW_MTRL:
            status = sql_fetch_vm_view_mtrl(stmt, cursor, plan_node, eof);
            break;

        default:
            status = sql_fetch_query(stmt, cursor, plan_node, eof);
            SQL_CHECK_SESSION_VALID_FOR_RETURN(stmt);
            return status;
    }

    SQL_CHECK_SESSION_VALID_FOR_RETURN(stmt);
    CM_TRACE_END(stmt, plan_node->plan_id);
    return status;
}

status_t sql_fetch_join(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan_node, bool32 *eof)
{
    if (cursor->eof) {
        *eof = OG_TRUE;
        return OG_SUCCESS;
    }
    CM_TRACE_BEGIN;
    switch (plan_node->join_p.oper) {
        case JOIN_OPER_NL:
            OG_RETURN_IFERR(sql_fetch_nest_loop(stmt, cursor, plan_node, eof));
            break;

        case JOIN_OPER_NL_BATCH:
            OG_RETURN_IFERR(sql_fetch_nest_loop_batch(stmt, cursor, plan_node, eof));
            break;

        case JOIN_OPER_NL_LEFT:
            OG_RETURN_IFERR(sql_fetch_nest_loop_left(stmt, cursor, plan_node, eof));
            break;

        case JOIN_OPER_NL_FULL:
            OG_RETURN_IFERR(sql_fetch_nest_loop_full(stmt, cursor, plan_node, eof));
            break;

        case JOIN_OPER_HASH:
        case JOIN_OPER_HASH_PAR:
        case JOIN_OPER_HASH_LEFT:
        case JOIN_OPER_HASH_RIGHT_LEFT:
        case JOIN_OPER_HASH_SEMI:
        case JOIN_OPER_HASH_ANTI:
        case JOIN_OPER_HASH_ANTI_NA:
        case JOIN_OPER_HASH_RIGHT_SEMI:
        case JOIN_OPER_HASH_RIGHT_ANTI:
        case JOIN_OPER_HASH_RIGHT_ANTI_NA:
        case JOIN_OPER_HASH_FULL:
            OG_RETURN_IFERR(g_hash_join_funcs[plan_node->join_p.oper].fetch(stmt, cursor, plan_node, eof));
            break;

        case JOIN_OPER_MERGE:
        case JOIN_OPER_MERGE_LEFT:
        case JOIN_OPER_MERGE_FULL:
            OG_RETURN_IFERR(og_merge_join_fetch(stmt, cursor, plan_node, eof));
            break;

        default:
            return OG_ERROR;
    }

    CM_TRACE_END(stmt, plan_node->plan_id);
    return OG_SUCCESS;
}

static void sql_set_cursor_cond(sql_cursor_t *cur, sql_query_t *query)
{
    if (query->connect_by_cond == NULL) {
        cur->cond = query->cond;
        return;
    }

    if (cur->connect_data.last_level_cursor == NULL) {
        cur->cond = query->start_with_cond;
    } else {
        cur->cond = query->connect_by_cond;
    }
}

static status_t sql_init_outer_join_exec_data(sql_stmt_t *stmt, sql_cursor_t *cur, uint32 count)
{
    uint32 i;
    uint32 mem_cost_size;

    mem_cost_size = count * sizeof(outer_join_data_t);
    OG_RETURN_IFERR(vmc_alloc(&cur->vmc, mem_cost_size, (void **)&cur->exec_data.outer_join));

    for (i = 0; i < count; ++i) {
        cur->exec_data.outer_join[i].need_reset_right = OG_TRUE;
        cur->exec_data.outer_join[i].right_matched = OG_FALSE;
        cur->exec_data.outer_join[i].need_swap_driver = OG_FALSE;
        cur->exec_data.outer_join[i].nl_full_opt_ctx = NULL;
    }
    return OG_SUCCESS;
}

static status_t sql_init_inner_join_exec_data(sql_stmt_t *stmt, sql_cursor_t *cursor, uint32 count)
{
    if (count == 0) {
        return OG_SUCCESS;
    }
    uint32 i;
    uint32 mem_cost_size;

    mem_cost_size = count * sizeof(inner_join_data_t);
    OG_RETURN_IFERR(vmc_alloc(&cursor->vmc, mem_cost_size, (void **)&cursor->exec_data.inner_join));

    for (i = 0; i < count; ++i) {
        cursor->exec_data.inner_join[i].right_fetched = OG_FALSE;
    }
    return OG_SUCCESS;
}

static status_t sql_init_aggr_dis_exec_data(sql_stmt_t *stmt, sql_cursor_t *cursor, uint32 count)
{
    uint32 i;
    uint32 mem_cost_size = sizeof(hash_segment_t) + count * sizeof(hash_table_entry_t);

    OG_RETURN_IFERR(vmc_alloc(&cursor->vmc, mem_cost_size, (void **)&cursor->exec_data.aggr_dis));

    hash_segment_t *hash_seg = (hash_segment_t *)cursor->exec_data.aggr_dis;
    vm_hash_segment_init(KNL_SESSION(stmt), stmt->mtrl.pool, hash_seg, PMA_POOL, HASH_PAGES_HOLD, HASH_AREA_SIZE);

    hash_table_entry_t *table_entry = (hash_table_entry_t *)(cursor->exec_data.aggr_dis + sizeof(hash_segment_t));
    for (i = 0; i < count; i++) {
        table_entry[i].vmid = OG_INVALID_ID32;
        table_entry[i].offset = OG_INVALID_ID32;
    }

    return OG_SUCCESS;
}

static status_t sql_init_rscol_defs_exec_data(sql_cursor_t *cursor, galist_t *rs_cols, char **buf)
{
    uint32 mem_cost_size;
    uint32 i;
    og_type_t *types = NULL;
    rs_column_t *rs_col = NULL;

    if (*buf == NULL) {
        mem_cost_size = sizeof(uint32) + rs_cols->count * sizeof(og_type_t);
        OG_RETURN_IFERR(vmc_alloc(&cursor->vmc, mem_cost_size, (void **)buf));
        *(uint32 *)*buf = mem_cost_size;
    }

    types = (og_type_t *)(*buf + sizeof(uint32));

    for (i = 0; i < rs_cols->count; i++) {
        rs_col = (rs_column_t *)cm_galist_get(rs_cols, i);
        types[i] = rs_col->datatype;
    }

    return OG_SUCCESS;
}

static status_t sql_init_nl_batch_exec_data(sql_stmt_t *stmt, sql_cursor_t *cur, uint32 count)
{
    uint32 mem_cost_size = count * sizeof(nl_batch_data_t);
    OG_RETURN_IFERR(vmc_alloc(&cur->vmc, mem_cost_size, (void **)&cur->exec_data.nl_batch));

    for (uint32 i = 0; i < count; ++i) {
        cur->exec_data.nl_batch[i].cache_cur = NULL;
    }
    return OG_SUCCESS;
}

static status_t sql_init_merge_join_exec_data(sql_stmt_t *stmt, sql_cursor_t *cur, uint32 count)
{
    uint32 mem_cost_size = count * sizeof(join_data_t);
    OG_RETURN_IFERR(vmc_alloc(&cur->vmc, mem_cost_size, (void **)&cur->m_join));

    for (uint32 i = 0; i < count; i++) {
        cur->m_join[i].left = cur->m_join[i].right = NULL;
    }
    return OG_SUCCESS;
}

status_t sql_generate_cursor_exec_data(sql_stmt_t *stmt, sql_cursor_t *cur, sql_query_t *query)
{
    sql_query_t *s_query = query->s_query;

    if (query->join_assist.outer_node_count > 0 || (s_query != NULL && s_query->join_assist.outer_node_count > 0)) {
        uint32 outer_plan_count = query->join_assist.outer_plan_count;
        if (s_query != NULL) {
            outer_plan_count = MAX(outer_plan_count, s_query->join_assist.outer_plan_count);
        }
        OG_RETURN_IFERR(sql_init_outer_join_exec_data(stmt, cur, outer_plan_count));
    }

    // start-with clause may generated new join plan node
    if (query->tables.count > 1 || s_query != NULL) {
        uint32 inner_plan_count = query->join_assist.inner_plan_count;
        if (s_query != NULL) {
            inner_plan_count = MAX(inner_plan_count, s_query->join_assist.inner_plan_count);
        }
        OG_RETURN_IFERR(sql_init_inner_join_exec_data(stmt, cur, inner_plan_count));
    }

    if (query->aggr_dis_count > 0) {
        OG_RETURN_IFERR(sql_init_aggr_dis_exec_data(stmt, cur, query->aggr_dis_count));
    }

    if (cur->select_ctx != NULL && cur->select_ctx->pending_col_count > 0) {
        OG_RETURN_IFERR(sql_init_rscol_defs_exec_data(cur, query->rs_columns, &cur->mtrl.rs.buf));
    }

    if (stmt->context->nl_batch_cnt > 0) {
        OG_RETURN_IFERR(sql_init_nl_batch_exec_data(stmt, cur, stmt->context->nl_batch_cnt));
    }

    if (query->join_assist.mj_plan_count > 0 || (s_query != NULL && s_query->join_assist.mj_plan_count > 0)) {
        uint32 mj_plan_count = query->join_assist.mj_plan_count;
        if (s_query != NULL) {
            mj_plan_count = MAX(mj_plan_count, s_query->join_assist.mj_plan_count);
        }
        OG_RETURN_IFERR(sql_init_merge_join_exec_data(stmt, cur, mj_plan_count));
    }

    return OG_SUCCESS;
}

uint16 sql_get_decode_count(sql_table_t *table)
{
    bilist_node_t *node = NULL;
    query_field_t *query_field = NULL;

    if (table->query_fields.count == 0) {
        return 0;
    }
    node = cm_bilist_tail(&table->query_fields);
    query_field = BILIST_NODE_OF(query_field_t, node, bilist_node);
    return (uint16)(query_field->col_id + 1);
}

static inline status_t sql_calc_rownum_core(sql_stmt_t *stmt, cond_node_t *node, expr_node_t *var_expr,
    cmp_type_t cmp_type, uint32 *max_rownum)
{
    variant_t tmp_var;
    variant_t *var = NULL;

    if (var_expr->type != EXPR_NODE_CONST && var_expr->type != EXPR_NODE_PARAM) {
        return OG_SUCCESS;
    }

    if (NODE_IS_CONST(var_expr)) {
        var = &var_expr->value;
    } else {
        OG_RETURN_IFERR(sql_exec_expr_node(stmt, var_expr, &tmp_var));
        var = &tmp_var;
        if (var->is_null) {
            *max_rownum = 0U;
            return OG_SUCCESS;
        }
    }

    switch (cmp_type) {
        case CMP_TYPE_EQUAL:
            /* rownum=v, v<1 ==> false */
            /* rownum=v, v is real with non-zero tail (e.g., 2.3) */
            OG_RETURN_IFERR(var_as_real(var));
            if (var->v_real < 1 || (fabs(var->v_real - (uint32)(int32)var->v_real) >= OG_REAL_PRECISION)) {
                *max_rownum = 0U;
            } else {
                *max_rownum = (uint32)(int32)var->v_real;
            }
            break;

        case CMP_TYPE_LESS:
            /* rownum<v, v<=1  ==> false */
            OG_RETURN_IFERR(var_as_real(var));
            if (var->v_real <= 1) {
                *max_rownum = 0U;
            } else {
                *max_rownum = (uint32)(int32)ceil(var->v_real - 1.0);
            }
            break;

        case CMP_TYPE_LESS_EQUAL:
            /* rownum<=v, v<1  ==> false */
            OG_RETURN_IFERR(var_as_real(var));
            if (var->v_real < 1) {
                *max_rownum = 0U;
            } else {
                *max_rownum = (uint32)(int32)var->v_real;
            }
            break;

        default:
            *max_rownum = OG_INFINITE32;
            break;
    }

    return OG_SUCCESS;
}

static inline status_t sql_calc_rownum_right(sql_stmt_t *stmt, cond_node_t *node, uint32 *max_rownum)
{
    cmp_node_t *cmp_node = node->cmp;

    switch (cmp_node->type) {
        case CMP_TYPE_IS_NULL:
            *max_rownum = 0U;
            return OG_SUCCESS;

        case CMP_TYPE_IS_NOT_NULL:
            *max_rownum = OG_INFINITE32;
            return OG_SUCCESS;

        case CMP_TYPE_EQUAL:
        case CMP_TYPE_LESS:
        case CMP_TYPE_LESS_EQUAL:
        case CMP_TYPE_GREAT_EQUAL:
        case CMP_TYPE_GREAT:
        case CMP_TYPE_NOT_EQUAL:
            return sql_calc_rownum_core(stmt, node, cmp_node->right->root, cmp_node->type, max_rownum);

        default:
            break;
    }
    return OG_SUCCESS;
}

static inline status_t sql_calc_rownum_left(sql_stmt_t *stmt, cond_node_t *node, uint32 *max_row_num)
{
    cmp_node_t *cmp_node = node->cmp;

    switch (cmp_node->type) {
        case CMP_TYPE_GREAT_EQUAL:
            return sql_calc_rownum_core(stmt, node, cmp_node->left->root, CMP_TYPE_LESS_EQUAL, max_row_num);

        case CMP_TYPE_GREAT:
            return sql_calc_rownum_core(stmt, node, cmp_node->left->root, CMP_TYPE_LESS, max_row_num);

        case CMP_TYPE_EQUAL:
        case CMP_TYPE_LESS:
        case CMP_TYPE_LESS_EQUAL:
        case CMP_TYPE_NOT_EQUAL:
            return sql_calc_rownum_core(stmt, node, cmp_node->left->root, cmp_node->type, max_row_num);

        default:
            break;
    }
    return OG_SUCCESS;
}

static inline status_t sql_calc_cmp_rownum(sql_stmt_t *stmt, cond_node_t *node, uint32 *max_row_num)
{
    cmp_node_t *cmp_node = node->cmp;
    expr_node_t *left = NULL;
    expr_node_t *right = NULL;

    *max_row_num = OG_INFINITE32;

    /* already set rnum_pending flag in rbo_try_rownum_optmz */
    if (!cmp_node->rnum_pending) {
        return OG_SUCCESS;
    }

    left = cmp_node->left->root;
    if (NODE_IS_RES_ROWNUM(left)) {
        OG_RETURN_IFERR(sql_calc_rownum_right(stmt, node, max_row_num));
    }

    if (cmp_node->type == CMP_TYPE_IS_NULL || cmp_node->type == CMP_TYPE_IS_NOT_NULL) {
        return OG_SUCCESS;
    }

    right = cmp_node->right->root;
    if ((cmp_node->right->next == NULL) && NODE_IS_RES_ROWNUM(right)) {
        OG_RETURN_IFERR(sql_calc_rownum_left(stmt, node, max_row_num));
    }

    return OG_SUCCESS;
}

static inline status_t sql_calc_cond_rownum(sql_stmt_t *stmt, cond_node_t *node, uint32 *rnum_upper)
{
    uint32 l_upper;
    uint32 r_upper;

    OG_RETURN_IFERR(sql_stack_safe(stmt));

    switch (node->type) {
        case COND_NODE_COMPARE:
            OG_RETURN_IFERR(sql_calc_cmp_rownum(stmt, node, rnum_upper));
            break;

        case COND_NODE_OR:
            OG_RETURN_IFERR(sql_calc_cond_rownum(stmt, node->left, &l_upper));
            OG_RETURN_IFERR(sql_calc_cond_rownum(stmt, node->right, &r_upper));
            *rnum_upper = MAX(l_upper, r_upper);
            break;

        case COND_NODE_AND:
            OG_RETURN_IFERR(sql_calc_cond_rownum(stmt, node->left, &l_upper));
            OG_RETURN_IFERR(sql_calc_cond_rownum(stmt, node->right, &r_upper));
            *rnum_upper = MIN(l_upper, r_upper);
            break;

        default:
            *rnum_upper = OG_INFINITE32;
            break;
    }

    return OG_SUCCESS;
}

static void sql_set_cursor_max_rownum(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_query_t *query)
{
    cursor->max_rownum = OG_INFINITE32;
    if (cursor->connect_data.last_level_cursor != NULL) {
        return;
    }

    cond_tree_t *cond = NULL;
    if (query->join_assist.outer_node_count > 0 && query->filter_cond != NULL) { // only used for rownum filter
        cond = query->filter_cond;
        cursor->max_rownum = cond->rownum_upper;
    } else if (query->cond != NULL) { // used for rownum filter, also used for rownum count
        cond = query->cond;
        cursor->max_rownum = cond->rownum_upper;
    }

    if (cursor->max_rownum == OG_INFINITE32 && cond != NULL && cond->rownum_pending) {
        (void)sql_calc_cond_rownum(stmt, cond->root, &cursor->max_rownum);
    }
}

status_t sql_fetch_rownum(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, bool32 *eof)
{
    if (cursor->rownum >= cursor->max_rownum) {
        cursor->connect_data.cur_level_cursor = NULL;
        *eof = OG_TRUE;
        return OG_SUCCESS;
    }
    cursor->rownum++;
    OG_RETURN_IFERR(sql_fetch_query(stmt, cursor, plan->rownum_p.next, eof));
    if (*eof) {
        return OG_SUCCESS;
    }
    return OG_SUCCESS;
}

static status_t sql_open_table_cursor(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_table_cursor_t *table_cur,
    knl_cursor_action_t cursor_action, bool32 is_select)
{
    sql_table_t *table = table_cur->table;
    sql_query_t *query = cursor->query;

    table_cur->scan_flag = table->tf_scan_flag;
    table_cur->hash_table = OG_FALSE;
    table_cur->action = cursor_action;
    sql_init_varea_set(stmt, table_cur);

    if (table->version.type == CURR_VERSION) {
        table_cur->scn = cursor->scn;
    } else {
        bool32 scn_type = (table->version.type == SCN_VERSION) ? OG_TRUE : OG_FALSE;
        OG_RETURN_IFERR(sql_convert_to_scn(stmt, table->version.expr, scn_type, &table_cur->scn));
    }

    is_select = is_select && cursor->select_ctx;
    if (OG_IS_SUBSELECT_TABLE(table->type)) {
        OG_RETURN_IFERR(sql_alloc_cursor(stmt, &table_cur->sql_cur));
        table_cur->sql_cur->scn = table_cur->scn;
        table_cur->sql_cur->select_ctx = table->select_ctx;
        table_cur->sql_cur->plan = table->select_ctx->plan;
        table_cur->sql_cur->global_cached = cursor->global_cached || table->global_cached;
        table_cur->sql_cur->ancestor_ref = cursor;
        table_cur->sql_cur->select_ctx->for_update_params.type =
            is_select ? cursor->select_ctx->for_update_params.type : ROWMARK_WAIT_BLOCK;
        table_cur->sql_cur->select_ctx->for_update_params.wait_seconds =
            is_select ? cursor->select_ctx->for_update_params.wait_seconds : 0;
        return OG_SUCCESS;
    }

    if (table->type == JSON_TABLE) {
        MEMS_RETURN_IFERR(
            memset_sp(&table_cur->json_table_exec, sizeof(json_table_exec_t), 0, sizeof(json_table_exec_t)));
    }

    OG_RETURN_IFERR(sql_alloc_knl_cursor(stmt, &table_cur->knl_cur));
    table_cur->knl_cur->action = (IF_LOCK_IN_FETCH(query) && table->for_update) ? CURSOR_ACTION_UPDATE : cursor_action;
    table_cur->knl_cur->for_update_fetch = table->for_update;
    table_cur->knl_cur->rowmark.type = is_select ? cursor->select_ctx->for_update_params.type : ROWMARK_WAIT_BLOCK;
    table_cur->knl_cur->rowmark.wait_seconds = is_select ? cursor->select_ctx->for_update_params.wait_seconds : 0;
    table_cur->knl_cur->update_info.count = 0;
    table_cur->knl_cur->global_cached = cursor->global_cached || table->global_cached;

    if (is_select) {
        table_cur->knl_cur->decode_count = sql_get_decode_count(table);
    }

    return OG_SUCCESS;
}

status_t sql_open_cursors(sql_stmt_t *stmt, sql_cursor_t *cur, sql_query_t *query, knl_cursor_action_t cursor_action,
    bool32 is_select)
{
    if (cur->is_open) {
        sql_close_cursor(stmt, cur);
    }

    cur->is_open = OG_TRUE;
    cur->select_ctx = query->owner;
    cur->query = query;
    cur->eof = OG_FALSE;
    cur->rownum = 0;

    sql_set_cursor_cond(cur, query);
    sql_set_cursor_max_rownum(stmt, cur, query);

    cur->columns = query->rs_columns;
    sql_reset_mtrl(stmt, cur);
    cur->table_count = 0;
    cur->winsort_ready = OG_FALSE;
    /*
     * @NOTE
     * "cur->table_count" is related to the sql_close_cursor() in dml_executor.c
     * if the counting method changed, check sql_close_cursor() too.
     */
    sql_array_t *tables = sql_get_query_tables(cur, query);
    OG_RETURN_IFERR(sql_alloc_table_cursors(cur, tables->count));

    for (uint32 i = 0; i < tables->count; i++) {
        sql_table_t *table = (sql_table_t *)sql_array_get(tables, i);
        cur->tables[i].table = table;
        cur->id_maps[i] = table->id;

        OG_RETURN_IFERR(sql_open_table_cursor(stmt, cur, &cur->tables[i], cursor_action, is_select));

        cur->table_count++;
    }

    sql_init_ssa_cursor_maps(cur, query->ssa.count);
    /* generate exec data for open cur, exec data contains:
       out join, aggr_distinct and etc.
    */
    return sql_generate_cursor_exec_data(stmt, cur, query);
}

static inline void sql_open_cursor_4_hash_mtrl(sql_stmt_t *stmt, sql_cursor_t *cur, sql_query_t *query)
{
    if (cur->is_open) {
        sql_close_cursor(stmt, cur);
    }
    sql_reset_mtrl(stmt, cur);
    cur->eof = OG_FALSE;
    cur->query = query;
    cur->rownum = 0;
    cur->is_open = OG_TRUE;
    cur->select_ctx = query->owner;
}

status_t sql_open_query_cursor(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_query_t *query)
{
    if (cursor->hash_mtrl_ctx != NULL) {
        sql_open_cursor_4_hash_mtrl(stmt, cursor, query);
        return OG_SUCCESS;
    }
    return sql_open_cursors(stmt, cursor, query, CURSOR_ACTION_SELECT, OG_TRUE);
}

status_t sql_fetch_query(sql_stmt_t *stmt, sql_cursor_t *cur, plan_node_t *plan, bool32 *eof)
{
    status_t status = OG_SUCCESS;

    if (cur->eof) {
        *eof = OG_TRUE;
        return OG_SUCCESS;
    }

    /* reset cached sequence */
    sql_reset_sequence(stmt);

    CM_TRACE_BEGIN;

    switch (plan->type) {
        case PLAN_NODE_SCAN:
            status = sql_fetch_scan(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_CONCATE:
            status = sql_fetch_concate(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_REMOTE_SCAN:
	    knl_panic(0);
            break;

        case PLAN_NODE_GROUP_MERGE:
	    knl_panic(0);
            break;
        case PLAN_NODE_JOIN:
            status = sql_fetch_join(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_QUERY_SORT:
            status = sql_fetch_sort(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_QUERY_SIBL_SORT:
            status = sql_fetch_sibl_sort(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_SORT_GROUP:
            status = sql_fetch_sort_group(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_MERGE_SORT_GROUP:
            status = sql_fetch_merge_sort_group(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_HASH_GROUP:
        case PLAN_NODE_HASH_GROUP_PAR:
        case PLAN_NODE_HASH_GROUP_PIVOT:
            status = sql_fetch_hash_group_new(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_INDEX_GROUP:
            status = sql_fetch_index_group(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_AGGR:
        case PLAN_NODE_INDEX_AGGR:
            status = sql_fetch_aggr(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_QUERY_LIMIT:
            status = sql_fetch_limit(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_SORT_DISTINCT:
            status = sql_fetch_sort_distinct(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_HASH_DISTINCT:
            status = sql_fetch_hash_distinct(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_INDEX_DISTINCT:
            status = sql_fetch_index_distinct(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_HAVING:
            status = sql_fetch_having(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_CONNECT:
            // cur->connect_data.first_level_cursor is the same with cur
            if (cur->connect_data.cur_level_cursor == NULL || cur->connect_data.cur_level_cursor == cur) {
                cur->connect_data.first_level_rownum++;
            }
            status = sql_fetch_connect(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_CONNECT_HASH:
            status = sql_fetch_connect_hash(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_FILTER:
            status = sql_fetch_filter(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_WINDOW_SORT:
            status = sql_fetch_winsort(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_HASH_MTRL:
            status = sql_fetch_hash_mtrl(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_GROUP_CUBE:
            status = sql_fetch_group_cube(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_UNPIVOT:
            status = sql_fetch_unpivot(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_ROWNUM:
            status = sql_fetch_rownum(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_FOR_UPDATE:
            status = sql_fetch_for_update(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_CONNECT_MTRL:
            status = sql_fetch_connect_mtrl(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_WITHAS_MTRL:
            status = sql_fetch_withas_mtrl(stmt, cur, plan, eof);
            break;

        case PLAN_NODE_VM_VIEW_MTRL:
            status = sql_fetch_vm_view_mtrl(stmt, cur, plan, eof);
            break;

        default:
            status = sql_fetch_scan(stmt, cur, plan, eof);
            break;
    }

    SQL_CHECK_SESSION_VALID_FOR_RETURN(stmt);
    if (!IS_QUERY_SCAN_PLAN(plan->type)) {
        CM_TRACE_END(stmt, plan->plan_id);
    }
    return status;
}

status_t sql_execute_join(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, bool32 *eof)
{
    CM_TRACE_BEGIN;
    switch (plan->join_p.oper) {
        case JOIN_OPER_NL:
            OG_RETURN_IFERR(sql_execute_nest_loop(stmt, cursor, plan, eof));
            break;

        case JOIN_OPER_NL_BATCH:
            OG_RETURN_IFERR(sql_execute_nest_loop_batch(stmt, cursor, plan, eof));
            break;

        case JOIN_OPER_NL_LEFT:
            OG_RETURN_IFERR(sql_execute_nest_loop_left(stmt, cursor, plan, eof));
            break;

        case JOIN_OPER_NL_FULL:
            OG_RETURN_IFERR(sql_execute_nest_loop_full(stmt, cursor, plan, eof));
            break;

        case JOIN_OPER_HASH:
        case JOIN_OPER_HASH_PAR:
        case JOIN_OPER_HASH_LEFT:
        case JOIN_OPER_HASH_RIGHT_LEFT:
        case JOIN_OPER_HASH_SEMI:
        case JOIN_OPER_HASH_ANTI:
        case JOIN_OPER_HASH_ANTI_NA:
        case JOIN_OPER_HASH_RIGHT_SEMI:
        case JOIN_OPER_HASH_RIGHT_ANTI:
        case JOIN_OPER_HASH_RIGHT_ANTI_NA:
        case JOIN_OPER_HASH_FULL:
            OG_RETURN_IFERR(g_hash_join_funcs[plan->join_p.oper].execute(stmt, cursor, plan, eof));
            break;

        case JOIN_OPER_MERGE:
        case JOIN_OPER_MERGE_LEFT:
        case JOIN_OPER_MERGE_FULL:
            OG_RETURN_IFERR(og_merge_join_execute(stmt, cursor, plan, eof));
            break;

        default:
            return OG_ERROR;
    }
    CM_TRACE_END(stmt, plan->plan_id);
    return OG_SUCCESS;
}

static status_t sql_execute_rownum(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    if (cursor->rownum >= cursor->max_rownum) {
        cursor->eof = OG_TRUE;
        return OG_SUCCESS;
    }
    cursor->rownum++;
    OG_RETURN_IFERR(sql_execute_query_plan(stmt, cursor, plan->rownum_p.next));
    cursor->rownum--;
    return OG_SUCCESS;
}

static status_t sql_make_mtrl_row_for_update(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_cursor_t *query_cur)
{
    row_assist_t ra;
    char *buf = NULL;
    mtrl_rowid_t rid;
    sql_table_cursor_t *table_cur = NULL;

    OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buf));
    for (uint32 i = 0; i < query_cur->table_count; i++) {
        table_cur = &query_cur->tables[query_cur->id_maps[i]];
        if (table_cur->table->subslct_tab_usage != SUBSELECT_4_NORMAL_JOIN) {
            row_init(&ra, buf, OG_MAX_ROW_SIZE, 1);
        } else {
            OG_RETURN_IFERR(
                sql_make_mtrl_table_rs_row(stmt, query_cur, query_cur->tables, table_cur->table, buf, OG_MAX_ROW_SIZE));
        }
        OG_RETURN_IFERR(mtrl_insert_row(&stmt->mtrl, cursor->mtrl.for_update, buf, &rid));
    }
    OGSQL_POP(stmt);
    return OG_SUCCESS;
}

static status_t lock_row_for_update(sql_stmt_t *stmt, sql_cursor_t *cursor, for_update_plan_t *for_update_p,
    bool32 *retry)
{
    variant_t value;
    bool32 is_found = OG_FALSE;
    expr_tree_t *expr = NULL;
    sql_table_cursor_t *table_cur = NULL;

    for (uint32 i = 0; i < for_update_p->rowids->count; i++) {
        expr = (expr_tree_t *)cm_galist_get(for_update_p->rowids, i);
        OG_RETURN_IFERR(sql_exec_expr(stmt, expr, &value));
        if (value.is_null) {
            continue;
        }
        table_cur = &cursor->tables[ROWID_EXPR_TAB(expr)];
        OG_RETURN_IFERR(sql_var2rowid(&value, &table_cur->knl_cur->rowid, table_cur->knl_cur->dc_type));
        SQL_CURSOR_PUSH(stmt, cursor);
        OG_RETURN_IFERR(knl_fetch_by_rowid(KNL_SESSION(stmt), table_cur->knl_cur, &is_found));
        SQL_CURSOR_POP(stmt);
        if (!is_found || table_cur->knl_cur->scn > KNL_SESSION(stmt)->query_scn) {
            *retry = OG_TRUE;
            return OG_SUCCESS;
        }
    }
    return OG_SUCCESS;
}

static status_t sql_mtrl_for_update(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_cursor_t *query_cur, plan_node_t *plan,
    bool32 *retry)
{
    bool32 eof = OG_FALSE;
    status_t status = OG_ERROR;
    cond_tree_t *save_cond = cursor->cond;

    *retry = OG_FALSE;
    OG_RETURN_IFERR(sql_open_cursors(stmt, query_cur, cursor->query, CURSOR_ACTION_SELECT, OG_TRUE));
    OG_RETURN_IFERR(mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_RS, NULL, &cursor->mtrl.for_update));
    OG_RETURN_IFERR(mtrl_open_segment(&stmt->mtrl, cursor->mtrl.for_update));

    SQL_CURSOR_PUSH(stmt, query_cur);
    if (sql_execute_query_plan(stmt, query_cur, plan->for_update.next) != OG_SUCCESS) {
        mtrl_close_segment(&stmt->mtrl, cursor->mtrl.for_update);
        return OG_ERROR;
    }

    cursor->cond = NULL;
    OGSQL_SAVE_STACK(stmt);

    for (;;) {
        OG_BREAK_IF_ERROR(sql_fetch_query(stmt, query_cur, plan->for_update.next, &eof));
        if (eof) {
            status = OG_SUCCESS;
            break;
        }

        OG_BREAK_IF_ERROR(lock_row_for_update(stmt, cursor, &plan->for_update, retry));
        if (*retry) {
            status = OG_SUCCESS;
            break;
        }
        OG_BREAK_IF_ERROR(sql_make_mtrl_row_for_update(stmt, cursor, query_cur));
        OGSQL_RESTORE_STACK(stmt);
    }
    OGSQL_RESTORE_STACK(stmt);
    SQL_CURSOR_POP(stmt);
    cursor->cond = save_cond;
    mtrl_close_segment(&stmt->mtrl, cursor->mtrl.for_update);
    return status;
}

static status_t sql_init_update_cursor(sql_stmt_t *stmt, sql_cursor_t *cursor)
{
    for (uint32 i = 0; i < cursor->table_count; i++) {
        sql_table_cursor_t *table_cur = &cursor->tables[cursor->id_maps[i]];
        if (OG_IS_SUBSELECT_TABLE(table_cur->table->type)) {
            sql_open_select_cursor(stmt, table_cur->sql_cur, table_cur->sql_cur->plan->select_p.rs_columns);
            continue;
        }
        if (table_cur->table->type == NORMAL_TABLE && table_cur->table->for_update) {
            table_cur->knl_cur->action = CURSOR_ACTION_UPDATE;
            table_cur->knl_cur->scan_mode = SCAN_MODE_ROWID;
            OG_RETURN_IFERR(knl_open_cursor(KNL_SESSION(stmt), table_cur->knl_cur, &table_cur->table->entry->dc));
        }
    }
    return OG_SUCCESS;
}

static status_t sql_reinit_update_cursor(sql_stmt_t *stmt, sql_cursor_t *cursor)
{
    OGSQL_RELEASE_SEGMENT(stmt, cursor->mtrl.for_update);
    for (uint32 i = 0; i < cursor->table_count; i++) {
        sql_table_cursor_t *table_cur = &cursor->tables[cursor->id_maps[i]];
        if (table_cur->table->type == NORMAL_TABLE && table_cur->table->for_update) {
            OG_RETURN_IFERR(knl_open_cursor(KNL_SESSION(stmt), table_cur->knl_cur, &table_cur->table->entry->dc));
        }
    }
    return OG_SUCCESS;
}

static inline void modify_scn_for_update(sql_stmt_t *stmt)
{
    knl_set_session_scn(KNL_SESSION(stmt), OG_INVALID_ID64);
    sql_set_scn(stmt);
}

static status_t sql_execute_for_update(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    if (IF_LOCK_IN_FETCH(cursor->query)) {
        return sql_execute_query_plan(stmt, cursor, plan->for_update.next);
    }

    bool32 retry = OG_FALSE;
    status_t status = OG_ERROR;
    sql_cursor_t *query_cur = NULL;

    OG_RETURN_IFERR(sql_init_update_cursor(stmt, cursor));
    OG_RETURN_IFERR(sql_alloc_cursor(stmt, &query_cur));

    while (OG_TRUE) {
        SQL_CHECK_SESSION_VALID_FOR_RETURN(stmt);

        OG_BREAK_IF_ERROR(sql_mtrl_for_update(stmt, cursor, query_cur, plan, &retry));
        if (!retry) {
            status = OG_SUCCESS;
            break;
        }
        modify_scn_for_update(stmt);
        OG_BREAK_IF_ERROR(sql_reinit_update_cursor(stmt, cursor));
    };
    sql_free_cursor(stmt, query_cur);
    OG_RETURN_IFERR(status);
    cursor->last_table = OG_INVALID_ID32;
    return mtrl_open_rs_cursor(&stmt->mtrl, cursor->mtrl.for_update, &cursor->mtrl.cursor);
}

static void pad_cursor_row_for_update(sql_table_cursor_t *table_cur, mtrl_cursor_t *mtrl_cursor)
{
    if (OG_IS_SUBSELECT_TABLE(table_cur->table->type)) {
        mtrl_row_t *row = &table_cur->sql_cur->mtrl.cursor.row;
        row->data = mtrl_cursor->row.data;
        cm_decode_row(mtrl_cursor->row.data, row->offsets, row->lens, NULL);
        return;
    }

    knl_cursor_t *knl_cur = table_cur->knl_cur;
    knl_cur->row = (row_head_t *)mtrl_cursor->row.data;
    cm_decode_row(mtrl_cursor->row.data, knl_cur->offsets, knl_cur->lens, NULL);
    if (table_cur->table->type == NORMAL_TABLE) {
        uint32 rs_row_size = ((row_head_t *)knl_cur->row)->size;
        knl_cur->rowid = *(rowid_t *)(knl_cur->row + rs_row_size - KNL_ROWID_LEN);
    }
}

static status_t mtrl_move_cursor_for_update(mtrl_context_t *ogx, mtrl_cursor_t *mtrl_cursor)
{
    if (mtrl_cursor->slot < (uint32)mtrl_cursor->rs_page->rows) {
        mtrl_cursor->row.data = MTRL_GET_ROW(mtrl_cursor->rs_page, mtrl_cursor->slot);
        mtrl_cursor->slot++;
        return OG_SUCCESS;
    }

    mtrl_cursor->history[mtrl_cursor->count++] = mtrl_cursor->rs_vmid;
    vm_ctrl_t *ctrl = vm_get_ctrl(ogx->pool, mtrl_cursor->rs_vmid);
    mtrl_cursor->rs_vmid = ctrl->next;
    if (mtrl_cursor->rs_vmid == OG_INVALID_ID32) {
        OG_THROW_ERROR(ERR_VM, "invalid for update rs rows");
        return OG_ERROR;
    }
    vm_page_t *page = NULL;
    OG_RETURN_IFERR(mtrl_open_page(ogx, mtrl_cursor->rs_vmid, &page));
    mtrl_cursor->rs_page = (mtrl_page_t *)page->data;
    mtrl_cursor->row.data = MTRL_GET_ROW(mtrl_cursor->rs_page, 0);
    mtrl_cursor->slot = 1;
    return OG_SUCCESS;
}

static status_t sql_fetch_mtrl_row_for_update(mtrl_context_t *ogx, sql_cursor_t *cursor, bool32 *eof)
{
    sql_table_cursor_t *table_cur = NULL;
    mtrl_cursor_t *mtrl_cursor = &cursor->mtrl.cursor;

    mtrl_close_history_page(ogx, mtrl_cursor);

    for (uint32 i = 0; i < cursor->table_count; i++) {
        table_cur = &cursor->tables[cursor->id_maps[i]];
        if (i == 0) {
            OG_RETURN_IFERR(mtrl_move_rs_cursor(ogx, mtrl_cursor));
            if (mtrl_cursor->eof) {
                *eof = OG_TRUE;
                return OG_SUCCESS;
            }
        } else {
            OG_RETURN_IFERR(mtrl_move_cursor_for_update(ogx, mtrl_cursor));
        }
        pad_cursor_row_for_update(table_cur, mtrl_cursor);
    }
    return OG_SUCCESS;
}

status_t sql_fetch_for_update(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, bool32 *eof)
{
    if (IF_LOCK_IN_FETCH(cursor->query)) {
        return sql_fetch_query(stmt, cursor, plan->for_update.next, eof);
    }

    return sql_fetch_mtrl_row_for_update(&stmt->mtrl, cursor, eof);
}

status_t sql_execute_query_plan(sql_stmt_t *stmt, sql_cursor_t *cur, plan_node_t *plan)
{
    OG_RETURN_IFERR(sql_stack_safe(stmt));
    CM_TRACE_BEGIN;
    status_t status;
    switch (plan->type) {
        case PLAN_NODE_SCAN:
            status = sql_execute_scan(stmt, cur, plan);
            break;

        case PLAN_NODE_CONCATE:
            status = sql_execute_concate(stmt, cur, plan);
            break;

        case PLAN_NODE_JOIN:
            status = sql_execute_join(stmt, cur, plan, &cur->eof);
            break;

        case PLAN_NODE_SORT_GROUP:
            status = sql_execute_sort_group(stmt, cur, plan);
            break;

        case PLAN_NODE_MERGE_SORT_GROUP:
            status = sql_execute_merge_sort_group(stmt, cur, plan);
            break;

        case PLAN_NODE_HASH_GROUP:
        case PLAN_NODE_HASH_GROUP_PAR:
        case PLAN_NODE_HASH_GROUP_PIVOT:
            status = sql_execute_hash_group_new(stmt, cur, plan);
            break;

        case PLAN_NODE_UNPIVOT:
            status = sql_execute_unpivot(stmt, cur, plan);
            break;

        case PLAN_NODE_QUERY_SORT:
        case PLAN_NODE_QUERY_SORT_PAR:
            status = sql_execute_query_sort(stmt, cur, plan);
            break;
        case PLAN_NODE_QUERY_SIBL_SORT:
            status = sql_execute_query_sibl_sort(stmt, cur, plan);
            break;
        case PLAN_NODE_INDEX_GROUP:
            status = sql_execute_index_group(stmt, cur, plan);
            break;

        case PLAN_NODE_AGGR:
            status = sql_execute_aggr(stmt, cur, plan);
            break;

        case PLAN_NODE_INDEX_AGGR:
            status = sql_execute_index_aggr(stmt, cur, plan);
            break;

        case PLAN_NODE_SORT_DISTINCT:
            status = sql_execute_sort_distinct(stmt, cur, plan);
            break;

        case PLAN_NODE_HASH_DISTINCT:
            status = sql_execute_hash_distinct(stmt, cur, plan);
            break;

        case PLAN_NODE_INDEX_DISTINCT:
            status = sql_execute_index_distinct(stmt, cur, plan);
            break;

        case PLAN_NODE_HAVING:
            status = sql_execute_query_plan(stmt, cur, plan->having.next);
            break;

        case PLAN_NODE_QUERY_LIMIT:
            status = sql_execute_query_limit(stmt, cur, plan);
            break;

        case PLAN_NODE_CONNECT:
            status = sql_execute_connect(stmt, cur, plan);
            break;

        case PLAN_NODE_CONNECT_HASH:
            status = sql_execute_connect_hash(stmt, cur, plan);
            break;

        case PLAN_NODE_FILTER:
            status = sql_execute_filter(stmt, cur, plan);
            break;

#ifdef OG_RAC_ING
        case PLAN_NODE_REMOTE_SCAN:
	    knl_panic(0);
            break;

        case PLAN_NODE_GROUP_MERGE:
	    knl_panic(0);
            break;
#endif
        case PLAN_NODE_WINDOW_SORT:
            status = sql_execute_winsort(stmt, cur, plan);
            break;

        case PLAN_NODE_HASH_MTRL:
            status = sql_execute_hash_mtrl(stmt, cur, plan);
            break;

        case PLAN_NODE_GROUP_CUBE:
            status = sql_execute_group_cube(stmt, cur, plan);
            break;

        case PLAN_NODE_ROWNUM:
            status = sql_execute_rownum(stmt, cur, plan);
            break;

        case PLAN_NODE_FOR_UPDATE:
            status = sql_execute_for_update(stmt, cur, plan);
            break;

        case PLAN_NODE_CONNECT_MTRL:
            status = sql_execute_connect_mtrl(stmt, cur, plan);
            break;

        default:
            status = OG_ERROR;
            OG_THROW_ERROR(ERR_UNKNOWN_PLAN_TYPE, (int32)plan->type, "execute sql");
            break;
    }
    if (!IS_QUERY_SCAN_PLAN(plan->type)) {
        CM_TRACE_END(stmt, plan->plan_id);
    }
    return status;
}

status_t sql_execute_query(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    plan_node_t *next = plan->query.next;

    OG_RETURN_IFERR(sql_open_query_cursor(stmt, cursor, plan->query.ref));

    return sql_execute_query_plan(stmt, cursor, next);
}

status_t sql_make_normal_rs(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_fetch_func_t sql_fetch_func,
    sql_send_row_func_t sql_send_row_func)
{
    bool32 is_full = OG_FALSE;

    do {
        OGSQL_SAVE_STACK(stmt);
        if (sql_fetch_func(stmt, cursor, stmt->rs_plan, &cursor->eof) != OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            return OG_ERROR;
        }

        if (cursor->eof) {
            my_sender(stmt)->send_column_def(stmt, cursor);
            sql_close_cursor(stmt, cursor);
            OGSQL_RESTORE_STACK(stmt);
            return OG_SUCCESS;
        }

        if (sql_send_row_func(stmt, cursor, &is_full) != OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            return OG_ERROR;
        }
        OGSQL_RESTORE_STACK(stmt);

        SQL_CHECK_SESSION_VALID_FOR_RETURN(stmt);
    } while (!is_full);

    my_sender(stmt)->send_column_def(stmt, cursor);
    return OG_SUCCESS;
}

sql_send_row_func_t sql_get_send_row_func(sql_stmt_t *stmt, plan_node_t *plan)
{
    if (SECUREC_UNLIKELY(stmt->rs_type == RS_TYPE_ROW)) {
        return sql_send_ori_row;
    }
    switch (plan->type) {
        case PLAN_NODE_QUERY:
            return sql_get_send_row_func(stmt, plan->query.next);

        case PLAN_NODE_SORT_DISTINCT:
        case PLAN_NODE_SELECT_SORT:
        case PLAN_NODE_MINUS:
        case PLAN_NODE_HASH_MINUS:
            return sql_put_sort_row;
        case PLAN_NODE_QUERY_SORT_PAR:
	    knl_panic(0);
            return sql_send_row;

        case PLAN_NODE_QUERY_SIBL_SORT:
        case PLAN_NODE_QUERY_SORT:
            if (plan->query_sort.has_pending_rs) {
                return sql_send_sort_row_filter;
            }
            return sql_put_sort_row;

        case PLAN_NODE_SELECT_LIMIT:
        case PLAN_NODE_QUERY_LIMIT:
            return sql_get_send_row_func(stmt, plan->limit.next);

        case PLAN_NODE_REMOTE_SCAN:
	    knl_panic(0);

        case PLAN_NODE_WINDOW_SORT:
            return sql_send_sort_row_filter;

        default:
            return sql_send_row;
    }
}

void sql_open_select_cursor(sql_stmt_t *stmt, sql_cursor_t *cur, galist_t *rs_columns)
{
    if (cur->is_open) {
        sql_close_cursor(stmt, cur);
    }

    cur->eof = OG_FALSE;
    cur->rownum = 0;
    sql_reset_mtrl(stmt, cur);
    cur->columns = rs_columns;
    cur->is_open = OG_TRUE;
}

status_t sql_execute_select_plan(sql_stmt_t *stmt, sql_cursor_t *cur, plan_node_t *plan)
{
    status_t status;
    OG_RETURN_IFERR(sql_stack_safe(stmt));
    OG_RETURN_IFERR(SQL_CURSOR_PUSH(stmt, cur));
    CM_TRACE_BEGIN;

    switch (plan->type) {
        case PLAN_NODE_QUERY:
            status = sql_execute_query(stmt, cur, plan);
            break;

        case PLAN_NODE_UNION:
            status = sql_execute_hash_union(stmt, cur, plan);
            break;

        case PLAN_NODE_UNION_ALL:
            status = sql_execute_union_all(stmt, cur, plan);
            break;

        case PLAN_NODE_MINUS:
            status = sql_execute_minus(stmt, cur, plan);
            break;

        case PLAN_NODE_HASH_MINUS:
            knl_panic(0);
            status = OG_ERROR;
            break;

        case PLAN_NODE_SELECT_SORT:
            status = sql_execute_select_sort(stmt, cur, plan);
            break;

        case PLAN_NODE_SELECT_LIMIT:
            status = sql_execute_select_limit(stmt, cur, plan);
            break;

        case PLAN_NODE_WITHAS_MTRL:
            status = sql_execute_withas_mtrl(stmt, cur, plan);
            break;

        case PLAN_NODE_VM_VIEW_MTRL:
            status = sql_execute_vm_view_mtrl(stmt, cur, plan);
            break;

        default:
            OG_THROW_ERROR_EX(ERR_SQL_SYNTAX_ERROR, "Don't support the plan type(%d)", plan->type);
            status = OG_ERROR;
            break;
    }
    CM_TRACE_END(stmt, plan->plan_id);

    // For SQL like "select distinct(union subquery + limit 0) from xxx" that scan no data,
    // @sql_get_select_value would encounter cursor->eof && row = 0. In this case, all columns will set be NULL.
    // To prevent null pointer dereference, set cursor->columns to select_ctx->rs_columns if it is NULL.
    if (cur->eof && cur->columns == NULL) {
        cur->columns = cur->select_ctx->rs_columns;
    }
    
    SQL_CURSOR_POP(stmt);
    return status;
}

status_t sql_execute_select(sql_stmt_t *stmt)
{
    sql_select_t *select = NULL;
    sql_cursor_t *cursor = NULL;
    sql_set_ssn(stmt);
    ogsql_assign_transaction_id(stmt, NULL);
    CM_TRACE_BEGIN;

    select = (sql_select_t *)stmt->context->entry;

    cursor = OGSQL_ROOT_CURSOR(stmt);
    cursor->plan = select->plan;
    cursor->select_ctx = select;
    cursor->scn = OG_INVALID_ID64;
    cursor->found_rows.offset_skipcount = 0;
    cursor->found_rows.limit_skipcount = 0;
    cursor->total_rows = 0;

    stmt->need_send_ddm = OG_TRUE;
    if (sql_execute_select_plan(stmt, cursor, cursor->plan->select_p.next) != OG_SUCCESS) {
        return OG_ERROR;
    }
    stmt->need_send_ddm = OG_FALSE;
    stmt->rs_type = select->rs_type;
    stmt->rs_plan = select->rs_plan;

    if (stmt->plsql_mode == PLSQL_CURSOR && !stmt->cursor_info.is_forcur) {
        return OG_SUCCESS;
    }
    OG_RETURN_IFERR(sql_make_result_set(stmt, cursor));
    CM_TRACE_END(stmt, select->plan->plan_id);
    return OG_SUCCESS;
}

status_t sql_init_multi_update(sql_stmt_t *stmt, sql_cursor_t *cursor, knl_cursor_action_t action,
    knl_cursor_t **knl_curs)
{
    sql_table_cursor_t *table_cur = NULL;
    knl_cursor_t *knl_cur = NULL;

    for (uint32 i = 0; i < cursor->table_count; ++i) {
        table_cur = &cursor->tables[i];

        if (table_cur->table->type != NORMAL_TABLE || table_cur->knl_cur->table != NULL) {
            table_cur->hash_table = OG_FALSE;
            continue;
        }

        OG_RETURN_IFERR(sql_push(stmt, KNL_SESSION(stmt)->kernel->attr.cursor_size, (void **)&knl_cur));
        knl_curs[i] = knl_cur;
        KNL_INIT_CURSOR(knl_cur);
        knl_init_cursor_buf(KNL_SESSION(stmt), knl_cur);
        knl_cur->stmt = stmt;
        knl_cur->action = action;
        knl_cur->update_info.count = 0;
        knl_cur->scan_mode = SCAN_MODE_ROWID;
        if (knl_open_cursor(KNL_SESSION(stmt), knl_cur, &table_cur->table->entry->dc) != OG_SUCCESS) {
            return OG_ERROR;
        }
        SET_ROWID_PAGE(&knl_cur->link_rid, INVALID_PAGID);
        table_cur->hash_table = OG_TRUE;
        table_cur->knl_cur->table = knl_cur->table;
    }
    return OG_SUCCESS;
}

static inline void sql_free_scan(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    sql_table_t *table = plan->scan_p.table;
    sql_table_cursor_t *table_cur = &cursor->tables[table->id];

    if ((table_cur->sql_cur != NULL) && OG_IS_SUBSELECT_TABLE(table->type)) {
        sql_free_cursor(stmt, table_cur->sql_cur);
        table_cur->sql_cur = NULL;
    }
}

static status_t sql_free_join(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    switch (plan->join_p.oper) {
        case JOIN_OPER_HASH:
        case JOIN_OPER_HASH_LEFT:
        case JOIN_OPER_HASH_SEMI:
        case JOIN_OPER_HASH_ANTI:
        case JOIN_OPER_HASH_ANTI_NA:
        case JOIN_OPER_HASH_FULL:
        case JOIN_OPER_HASH_PAR:
            og_hash_join_free_mtrl_cursor(stmt, cursor, plan);
            break;
        case JOIN_OPER_MERGE:
        case JOIN_OPER_MERGE_LEFT:
        case JOIN_OPER_MERGE_FULL:
            og_merge_join_free(stmt, cursor, plan);
            break;
        default:
            break;
    }

    OG_RETURN_IFERR(sql_free_query_mtrl(stmt, cursor, plan->join_p.left));
    OG_RETURN_IFERR(sql_free_query_mtrl(stmt, cursor, plan->join_p.right));

    return OG_SUCCESS;
}

status_t sql_free_query_mtrl(sql_stmt_t *stmt, sql_cursor_t *cur, plan_node_t *plan)
{
    status_t status = OG_SUCCESS;
    OG_RETURN_IFERR(sql_stack_safe(stmt));

    switch (plan->type) {
        case PLAN_NODE_SCAN:
            sql_free_scan(stmt, cur, plan);
            break;

        case PLAN_NODE_CONCATE:
            if (cur->cnct_ctx != NULL) {
                sql_free_concate_ctx(stmt, cur->cnct_ctx);
                cur->cnct_ctx = NULL;
            }
            break;

        case PLAN_NODE_JOIN:
            status = sql_free_join(stmt, cur, plan);
            break;

        case PLAN_NODE_QUERY_SORT:
            sql_sort_mtrl_release_segment(stmt, cur);
            break;

        case PLAN_NODE_QUERY_SIBL_SORT:
            sql_free_sibl_sort(stmt, cur);
            break;

        case PLAN_NODE_SORT_GROUP:
        case PLAN_NODE_HASH_GROUP:
        case PLAN_NODE_HASH_GROUP_PAR:
            if (cur->group_ctx != NULL) {
                sql_free_group_ctx(stmt, cur->group_ctx);
                cur->group_ctx = NULL;
            }
            break;

        case PLAN_NODE_HASH_GROUP_PIVOT:
            if (cur->pivot_ctx != NULL) {
                sql_free_group_ctx(stmt, cur->pivot_ctx);
                cur->pivot_ctx = NULL;
            }
            break;

        case PLAN_NODE_MERGE_SORT_GROUP:
            mtrl_close_cursor(&stmt->mtrl, &cur->mtrl.cursor);
            OGSQL_RELEASE_SEGMENT(stmt, cur->mtrl.group.sid);
            break;

        case PLAN_NODE_INDEX_GROUP:
            OGSQL_RELEASE_SEGMENT(stmt, cur->mtrl.group_index);
            break;

        case PLAN_NODE_SORT_DISTINCT:
        case PLAN_NODE_HASH_DISTINCT:
            if (cur->distinct_ctx != NULL) {
                sql_free_distinct_ctx(cur->distinct_ctx);
                cur->distinct_ctx = NULL;
            }
            break;

        case PLAN_NODE_INDEX_DISTINCT:
            OGSQL_RELEASE_SEGMENT(stmt, cur->mtrl.index_distinct);
            break;

        case PLAN_NODE_CONNECT:
        case PLAN_NODE_CONNECT_HASH:
            sql_free_connect_cursor(stmt, cur);
            break;

        case PLAN_NODE_HASH_MTRL:
            cur->hash_mtrl_ctx = NULL;
            break;

        case PLAN_NODE_GROUP_CUBE:
            if (cur->exec_data.group_cube != NULL) {
                sql_free_group_cube(stmt, cur);
                cur->exec_data.group_cube = NULL;
            }
            break;

        case PLAN_NODE_FOR_UPDATE:
            status = sql_free_query_mtrl(stmt, cur, plan->for_update.next);
            // fall through

        case PLAN_NODE_WINDOW_SORT:
        default:
            break;
    }
    return status;
}