/* -------------------------------------------------------------------------
 *  This file is part of the oGRAC project.
 * Copyright (c) 2024 Huawei Technologies Co.,Ltd.
 *
 * oGRAC is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * ogsql_winsort.c
 *
 *
 * IDENTIFICATION
 * src/ogsql/executor/ogsql_winsort.c
 *
 * -------------------------------------------------------------------------
 */
#include "ogsql_aggr.h"
#include "ogsql_proj.h"
#include "ogsql_select.h"
#include "ogsql_mtrl.h"
#include "ogsql_winsort.h"
#include "srv_instance.h"
#include "ogsql_winsort_window.h"

#define MAX_WINSORT_SIZE 8

typedef struct st_winsort_pair {
    bool32 need_free;
    plan_node_t *plan;
    sql_cursor_t *cursor;
} winsort_pair_t;

typedef struct st_winsort_assist {
    uint32 count;
    winsort_pair_t pairs[MAX_WINSORT_SIZE];
} winsort_assist_t;

static status_t sql_win_set_aggr_value(sql_stmt_t *stmt, variant_t *var, variant_t *result, const char *buf)
{
    switch (result->type) {
        case OG_TYPE_UINT32:
        case OG_TYPE_INTEGER:
        case OG_TYPE_BIGINT:
        case OG_TYPE_REAL:
        case OG_TYPE_NUMBER:
        case OG_TYPE_NUMBER2:
        case OG_TYPE_DECIMAL:
        case OG_TYPE_DATE:
        case OG_TYPE_TIMESTAMP:
        case OG_TYPE_TIMESTAMP_TZ_FAKE:
        case OG_TYPE_TIMESTAMP_TZ:
        case OG_TYPE_TIMESTAMP_LTZ:
        case OG_TYPE_INTERVAL_DS:
        case OG_TYPE_INTERVAL_YM:
        case OG_TYPE_BOOLEAN:
            *result = *var;
            break;

        case OG_TYPE_CHAR:
        case OG_TYPE_VARCHAR:
        case OG_TYPE_STRING:
            if (var->v_text.len == 0) {
                return OG_SUCCESS;
            } else if (var->v_text.len > OG_MAX_ROW_SIZE) {
                OG_THROW_ERROR(ERR_EXCEED_MAX_ROW_SIZE, var->v_text.len, OG_MAX_ROW_SIZE);
                return OG_ERROR;
            }
            result->v_text.str = (char *)((result->v_text.str == NULL) ? buf : result->v_text.str);
            MEMS_RETURN_IFERR(memcpy_s(result->v_text.str, OG_MAX_ROW_SIZE, var->v_text.str, var->v_text.len));
            result->v_text.len = var->v_text.len;
            break;

        case OG_TYPE_BINARY:
        case OG_TYPE_VARBINARY:
        case OG_TYPE_RAW:
            if (var->v_bin.size == 0) {
                return OG_SUCCESS;
            } else if (var->v_bin.size > OG_MAX_ROW_SIZE) {
                OG_THROW_ERROR(ERR_EXCEED_MAX_ROW_SIZE, var->v_bin.size, OG_MAX_ROW_SIZE);
                return OG_ERROR;
            }
            result->v_text.str = (char *)((result->v_text.str == NULL) ? buf : result->v_text.str);
            MEMS_RETURN_IFERR(memcpy_s(result->v_bin.bytes, OG_MAX_ROW_SIZE, var->v_bin.bytes, var->v_bin.size));

            result->v_bin.size = var->v_bin.size;
            break;

        default:
            OG_SET_ERROR_MISMATCH_EX(result->type);
            return OG_ERROR;
    }
    return OG_SUCCESS;
}

static inline status_t sql_win_aggr_last_value(sql_stmt_t *stmt, variant_t *res, variant_t *var, const char *buf,
    bool8 ignore_nulls)
{
    res->is_null = var->is_null;
    return sql_win_set_aggr_value(stmt, var, res, buf);
}

static status_t sql_win_aggr_min_or_max(sql_stmt_t *stmt, sql_aggr_type_t type, variant_t *result, variant_t *var,
    const char *buf)
{
    int32 cmp_result;

    if (result->type != var->type) {
        OG_RETURN_IFERR(sql_convert_variant(stmt, var, result->type));
    }

    if (result->is_null) {
        result->is_null = OG_FALSE;
        return sql_win_set_aggr_value(stmt, var, result, buf);
    }

    OG_RETURN_IFERR(sql_compare_variant(stmt, result, var, &cmp_result));

    if ((type == AGGR_TYPE_MIN && cmp_result > 0) || (type == AGGR_TYPE_MAX && cmp_result < 0)) {
        return sql_win_set_aggr_value(stmt, var, result, buf);
    }
    return OG_SUCCESS;
}

static status_t sql_win_aggr_count(sql_stmt_t *stmt, aggr_var_t *aggr_var, variant_t *var)
{
    char *buf = NULL;
    row_assist_t ra;
    bool32 var_exist = OG_FALSE;
    variant_t *result = &aggr_var->var;
    aggr_count_t *data = GET_AGGR_VAR_COUNT(aggr_var);
    OG_RETVALUE_IFTRUE(data == NULL, OG_ERROR);

    if (data->has_distinct) {
        OGSQL_SAVE_STACK(stmt);
        sql_keep_stack_variant(stmt, var);
        if (sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buf) != OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            return OG_ERROR;
        }
        row_init(&ra, buf, OG_MAX_ROW_SIZE, 1);
        if (sql_put_row_value(stmt, NULL, &ra, var->type, var) != OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            return OG_ERROR;
        }
        if (vm_hash_table_insert2(&var_exist, &data->ex_hash_segment, &data->ex_table_entry, buf, ra.head->size) !=
            OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            return OG_ERROR;
        }

        OGSQL_RESTORE_STACK(stmt);
        if (var_exist) {
            return OG_SUCCESS;
        }
    }
    result->is_null = OG_FALSE;
    result->v_bigint++;
    return OG_SUCCESS;
}

static status_t sql_win_aggr_listagg(sql_stmt_t *stmt, expr_node_t *aggr_node, sql_aggr_type_t type,
    aggr_var_t *aggr_var, const char *buf)
{
    variant_t value;
    expr_tree_t *arg = aggr_node->argument;

    OGSQL_SAVE_STACK(stmt);
    aggr_var->var.v_text.str = aggr_var->var.is_null ? (char *)buf : aggr_var->var.v_text.str;

    while (arg != NULL) {
        if (aggr_var->var.is_null) {
            aggr_var->var.is_null = OG_FALSE;
            arg = arg->next;
            continue;
        }
        OG_RETURN_IFERR(sql_exec_expr(stmt, arg, &value));
        if (value.is_null) {
            arg = arg->next;
            continue;
        }
        if (!OG_IS_STRING_TYPE(value.type)) {
            if (sql_var_as_string(stmt, &value) != OG_SUCCESS) {
                OGSQL_RESTORE_STACK(stmt);
                return OG_ERROR;
            }
        }

        if (value.v_text.len + aggr_var->var.v_text.len > OG_MAX_ROW_SIZE) {
            OGSQL_RESTORE_STACK(stmt);
            OG_THROW_ERROR(ERR_EXCEED_MAX_ROW_SIZE, value.v_text.len + aggr_var->var.v_text.len, OG_MAX_ROW_SIZE);
            return OG_ERROR;
        }
        if (value.v_text.len > 0) {
            MEMS_RETURN_IFERR(memcpy_s(aggr_var->var.v_text.str + aggr_var->var.v_text.len,
                OG_MAX_ROW_SIZE - aggr_var->var.v_text.len, value.v_text.str, value.v_text.len));
            aggr_var->var.v_text.len += value.v_text.len;
            aggr_var->var.is_null = OG_FALSE;
        }
        arg = arg->next;
        OGSQL_RESTORE_STACK(stmt);
    }
    return OG_SUCCESS;
}

static status_t sql_win_aggr_lag(sql_stmt_t *stmt, sql_aggr_type_t type, variant_t *result, variant_t *var,
    const char *buf)
{
    OG_RETURN_IFERR(sql_win_set_aggr_value(stmt, var, result, buf));
    return OG_SUCCESS;
}

void sql_winsort_aggr_value_null(aggr_var_t *aggr_var, expr_tree_t *func_expr, sql_aggr_type_t aggr_type,
    variant_t *result)
{
    switch (aggr_type) {
        case AGGR_TYPE_LAG:
            result->is_null = OG_TRUE;
            break;
        case AGGR_TYPE_LAST_VALUE:
            if (!func_expr->root->ignore_nulls) {
                result->is_null = OG_TRUE;
            }
            break;
        case AGGR_TYPE_FIRST_VALUE:
            if (!func_expr->root->ignore_nulls && !GET_AGGR_VAR_FIR_VAL(aggr_var)->ex_has_val) {
                result->is_null = OG_TRUE;
                GET_AGGR_VAR_FIR_VAL(aggr_var)->ex_has_val = OG_TRUE;
            }
            break;
        default:
            break;
    }
}

static inline void sql_winsort_value_set_notnull(variant_t *result)
{
    if (result->is_null) {
        result->is_null = OG_FALSE;
        if (OG_IS_NUMBER_TYPE(result->type)) {
            cm_zero_dec(&result->v_dec);
        }
    }
}

static status_t og_aggr_sum_value_basic(sql_stmt_t *statement, aggr_var_t *aggr_var,
    variant_t *v_aggr, variant_t *v_add)
{
    char *buf = NULL;
    row_assist_t ra;
    bool32 var_exist;
    aggr_sum_t *data = get_aggr_var_sum(aggr_var);

    if (data != NULL && data->has_distinct) {
        OGSQL_SAVE_STACK(statement);
        sql_keep_stack_variant(statement, v_add);
        if (sql_push(statement, OG_MAX_ROW_SIZE, (void **)&buf) != OG_SUCCESS) {
            OGSQL_RESTORE_STACK(statement);
            return OG_ERROR;
        }
        row_init(&ra, buf, OG_MAX_ROW_SIZE, 1);
        if (sql_put_row_value(statement, NULL, &ra, v_add->type, v_add) != OG_SUCCESS) {
            OGSQL_RESTORE_STACK(statement);
            return OG_ERROR;
        }
        if (vm_hash_table_insert2(&var_exist, &data->ex_hash_segment, &data->ex_table_entry, buf, ra.head->size) !=
            OG_SUCCESS) {
            OGSQL_RESTORE_STACK(statement);
            return OG_ERROR;
        }

        OGSQL_RESTORE_STACK(statement);
        if (var_exist) {
            return OG_SUCCESS;
        }
    }

    return sql_aggr_sum_value(statement, v_aggr, v_add);
}

status_t sql_get_winsort_aggr_value(aggr_assist_t *aggr_ass, aggr_var_t *aggr_var, const char *buf, variant_t *vars,
    variant_t *result)
{
    switch (aggr_ass->aggr_type) {
        case AGGR_TYPE_STDDEV:
        case AGGR_TYPE_STDDEV_POP:
        case AGGR_TYPE_STDDEV_SAMP:
        case AGGR_TYPE_VARIANCE:
        case AGGR_TYPE_VAR_POP:
        case AGGR_TYPE_VAR_SAMP:
        case AGGR_TYPE_CORR:
            return sql_aggr_invoke(aggr_ass, aggr_var, vars);
        case AGGR_TYPE_COVAR_POP:
        case AGGR_TYPE_COVAR_SAMP:
            if ((vars[0].is_null) || (vars[1].is_null)) {
                return OG_SUCCESS;
            } else {
                GET_AGGR_VAR_COVAR(aggr_var)->extra.is_null = OG_FALSE;
                GET_AGGR_VAR_COVAR(aggr_var)->extra_1.is_null = OG_FALSE;
                return sql_aggr_invoke(aggr_ass, aggr_var, vars);
            }
        case AGGR_TYPE_AVG:
            GET_AGGR_VAR_AVG(aggr_var)->ex_avg_count++;
            /* fall-through */
        case AGGR_TYPE_SUM:
            sql_winsort_value_set_notnull(result);
            return og_aggr_sum_value_basic(aggr_ass->stmt, aggr_var, result, vars);

        case AGGR_TYPE_LAG:
            result->is_null = OG_FALSE;
            return sql_win_aggr_lag(aggr_ass->stmt, aggr_ass->aggr_type, result, vars, buf);

        case AGGR_TYPE_FIRST_VALUE:
            if (GET_AGGR_VAR_FIR_VAL(aggr_var)->ex_has_val) {
                return OG_SUCCESS;
            }
            GET_AGGR_VAR_FIR_VAL(aggr_var)->ex_has_val = OG_TRUE;
            return sql_win_aggr_last_value(aggr_ass->stmt, result, vars, buf, aggr_ass->aggr_node->ignore_nulls);

        case AGGR_TYPE_LAST_VALUE:
            return sql_win_aggr_last_value(aggr_ass->stmt, result, vars, buf, aggr_ass->aggr_node->ignore_nulls);

        case AGGR_TYPE_MIN:
        case AGGR_TYPE_MAX:
            return sql_win_aggr_min_or_max(aggr_ass->stmt, aggr_ass->aggr_type, result, vars, buf);

        case AGGR_TYPE_COUNT:
            return sql_win_aggr_count(aggr_ass->stmt, aggr_var, vars);

        case AGGR_TYPE_GROUP_CONCAT:
            return sql_win_aggr_listagg(aggr_ass->stmt, aggr_ass->aggr_node, aggr_ass->aggr_type, aggr_var, buf);

        default:
            return OG_ERROR;
    }
}

static status_t sql_winsort_aggr_value(sql_stmt_t *stmt, sql_aggr_type_t aggr_type, expr_tree_t *func_expr,
    aggr_var_t *aggr_var, const char *buf)
{
    variant_t vars[FO_VAL_MAX - 1];
    variant_t *result = &aggr_var->var;

    OG_RETURN_IFERR(sql_exec_expr(stmt, func_expr->root->argument, &vars[0]));
    if (aggr_type == AGGR_TYPE_CORR || aggr_type == AGGR_TYPE_COVAR_POP || aggr_type == AGGR_TYPE_COVAR_SAMP) {
        OG_RETURN_IFERR(sql_exec_expr(stmt, func_expr->root->argument->next, &vars[1]));
    } else {
        vars[1].is_null = OG_TRUE;
    }
    if (aggr_type == AGGR_TYPE_GROUP_CONCAT) {
        OG_RETURN_IFERR(sql_exec_expr(stmt, func_expr->root->argument->next, &vars[0]));
    }
    if (vars[0].is_null) {
        sql_winsort_aggr_value_null(aggr_var, func_expr, aggr_type, result);
        return OG_SUCCESS;
    }

    aggr_assist_t aggr_ass;
    SQL_INIT_AGGR_ASSIST(&aggr_ass, stmt, NULL);
    aggr_ass.aggr_type = aggr_type;
    aggr_ass.aggr_node = func_expr->root;
    return sql_get_winsort_aggr_value(&aggr_ass, aggr_var, buf, vars, result);
}

static status_t sql_verify_winsort_row_number(sql_verifier_t *verf, expr_node_t *winsort)
{
    expr_node_t *func_node = winsort->argument->root;
    if (winsort->win_args->sort_items == NULL) {
        OG_THROW_ERROR_EX(ERR_NO_ORDER_BY_CLAUSE, "%s", T2S((text_t *)&func_node->word.func.name));
        return OG_ERROR;
    }
    if (winsort->win_args->windowing != NULL) {
        OG_SRC_THROW_ERROR(func_node->loc, ERR_UNSUPPORT_FUNC, "Windowing clause is",
            "in function ROW_NUMBER/RANK/DENSE_RANK");
        return OG_ERROR;
    }
    OG_RETURN_IFERR(sql_verify_func_node(verf, func_node, 0, 0, OG_INVALID_ID32));
    winsort->datatype = OG_TYPE_INTEGER;
    winsort->size = sizeof(uint32);
    return OG_SUCCESS;
}

static inline status_t sql_winsort_calc_aggr(sql_stmt_t *stmt, sql_aggr_type_t type, aggr_var_t *aggr_var)
{
    aggr_assist_t aggr_ass;
    SQL_INIT_AGGR_ASSIST(&aggr_ass, stmt, NULL);
    aggr_ass.aggr_type = type;
    return sql_aggr_calc_value(&aggr_ass, aggr_var);
}

static status_t sql_func_winsort_row_number(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    mtrl_row_t *row = NULL;
    uint32 row_number = 0;
    bool32 grp_changed;
    bool32 ord_changed;
    uint32 rs_col_id = VALUE(uint32, &plan->winsort_p.winsort->value);

    OG_RETURN_IFERR(mtrl_open_cursor(&stmt->mtrl, cursor->mtrl.winsort_sort.sid, &cursor->mtrl.cursor));

    for (;;) {
        SQL_CHECK_SESSION_VALID_FOR_RETURN(stmt);
        OG_RETURN_IFERR(
            mtrl_fetch_winsort_rid(&stmt->mtrl, &cursor->mtrl.cursor, WINSORT_PART, &grp_changed, &ord_changed));
        if (cursor->mtrl.cursor.eof) {
            break;
        }

        row = &cursor->mtrl.cursor.row;
        if (row->lens[rs_col_id] != sizeof(uint32)) {
            OG_THROW_ERROR_EX(ERR_ASSERT_ERROR, "row->lens[rs_col_id](%u) == sizeof(uint32)",
                (uint32)row->lens[rs_col_id]);
            return OG_ERROR;
        }
        *(uint32 *)(row->data + row->offsets[rs_col_id]) = ++row_number;
        row_number = grp_changed ? 0 : row_number;
    }
    mtrl_close_cursor(&stmt->mtrl, &cursor->mtrl.cursor);
    return OG_SUCCESS;
}

static status_t sql_func_winsort_dense_rank(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    mtrl_row_t *row = NULL;
    bool32 grp_changed;
    bool32 ord_changed;
    uint32 rs_col_id = VALUE(uint32, &plan->winsort_p.winsort->value);
    uint32 dense_rank = 1;

    OG_RETURN_IFERR(mtrl_open_cursor(&stmt->mtrl, cursor->mtrl.winsort_sort.sid, &cursor->mtrl.cursor));

    for (;;) {
        SQL_CHECK_SESSION_VALID_FOR_RETURN(stmt);
        OG_RETURN_IFERR(mtrl_fetch_winsort_rid(&stmt->mtrl, &cursor->mtrl.cursor, WINSORT_PART | WINSORT_ORDER,
            &grp_changed, &ord_changed));
        if (cursor->mtrl.cursor.eof) {
            break;
        }

        row = &cursor->mtrl.cursor.row;
        if (row->lens[rs_col_id] != sizeof(uint32)) {
            OG_THROW_ERROR_EX(ERR_ASSERT_ERROR, "row->lens[rs_col_id](%u) == sizeof(uint32)",
                (uint32)row->lens[rs_col_id]);
            return OG_ERROR;
        }

        *(uint32 *)(row->data + row->offsets[rs_col_id]) = dense_rank;
        if (ord_changed) {
            dense_rank++;
        }
        dense_rank = grp_changed ? 1 : dense_rank;
    }
    mtrl_close_cursor(&stmt->mtrl, &cursor->mtrl.cursor);
    return OG_SUCCESS;
}

static status_t sql_func_winsort_rank(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    mtrl_row_t *row = NULL;
    bool32 grp_changed;
    bool32 ord_changed;
    uint32 rs_col_id = VALUE(uint32, &plan->winsort_p.winsort->value);
    uint32 rank = 1;
    uint32 equal_cnt = 1;
    bool32 last_changed = OG_FALSE;

    OG_RETURN_IFERR(mtrl_open_cursor(&stmt->mtrl, cursor->mtrl.winsort_sort.sid, &cursor->mtrl.cursor));

    for (;;) {
        SQL_CHECK_SESSION_VALID_FOR_RETURN(stmt);
        OG_RETURN_IFERR(mtrl_fetch_winsort_rid(&stmt->mtrl, &cursor->mtrl.cursor, WINSORT_PART | WINSORT_ORDER,
            &grp_changed, &ord_changed));
        if (cursor->mtrl.cursor.eof) {
            break;
        }

        row = &cursor->mtrl.cursor.row;
        if (row->lens[rs_col_id] != sizeof(uint32)) {
            OG_THROW_ERROR_EX(ERR_ASSERT_ERROR, "row->lens[rs_col_id](%u) == sizeof(uint32)",
                (uint32)row->lens[rs_col_id]);
            return OG_ERROR;
        }

        if (last_changed) {
            rank += equal_cnt;
            equal_cnt = 1;
        }
        *(uint32 *)(row->data + row->offsets[rs_col_id]) = rank;

        if (ord_changed) {
            last_changed = OG_TRUE;
        } else {
            equal_cnt++;
            last_changed = OG_FALSE;
        }

        if (grp_changed) {
            rank = 1;
            equal_cnt = 1;
            last_changed = OG_FALSE;
        }
    }
    mtrl_close_cursor(&stmt->mtrl, &cursor->mtrl.cursor);
    return OG_SUCCESS;
}

static inline void sql_winsort_row_number_default(variant_t *value)
{
    value->type = OG_TYPE_INTEGER;
    value->is_null = OG_FALSE;
    value->v_int = 0;
}

static inline status_t sql_winsort_row_number_actual(sql_stmt_t *stmt, sql_cursor_t *cursor, expr_node_t *node,
    variant_t *value)
{
    uint32 id = VALUE(uint32, &node->value);
    mtrl_row_assist_t row_assist;
    mtrl_row_init(&row_assist, &cursor->mtrl.cursor.row);
    return mtrl_get_column_value(&row_assist, cursor->mtrl.cursor.eof, id, OG_TYPE_INTEGER, OG_FALSE, value);
}

static inline int64 sql_func_ntile_bucket(int64 row_num, int64 count, int64 bucket_count)
{
    int64 temp = count / bucket_count;
    int64 each_group_count;
    int64 bucket_plus_count;
    if (temp >= 1) {
        each_group_count = temp;
        bucket_plus_count = count % bucket_count;
    } else {
        each_group_count = 1;
        bucket_plus_count = 0;
    }
    if (bucket_plus_count * (each_group_count + 1) >= row_num) {
        return (row_num - 1) / (each_group_count + 1) + 1;
    } else {
        return (row_num - bucket_plus_count - 1) / each_group_count + 1;
    }
}

static inline status_t sql_win_aggr_get_ntile(sql_stmt_t *stmt, sql_cursor_t *cursor, mtrl_rowid_t *mtrl_rid,
    variant_t *value)
{
    char *row = NULL;
    int64 *group = NULL;
    variant_t row_number;
    int64 *bucket_count = NULL;
    if (mtrl_win_aggr_get(&stmt->mtrl, cursor->mtrl.winsort_aggr.sid, (char **)&row, mtrl_rid, OG_TRUE) != OG_SUCCESS) {
        return OG_ERROR;
    }
    row_number = ((aggr_var_t *)row)->var;
    mtrl_rowid_t group_rowid_addr = *(mtrl_rowid_t *)(row + sizeof(aggr_var_t));
    if (mtrl_win_aggr_get(&stmt->mtrl, cursor->mtrl.winsort_aggr.sid, (char **)&group, &group_rowid_addr, OG_TRUE) !=
        OG_SUCCESS) {
        return OG_ERROR;
    }
    bucket_count = (int64 *)((char *)group + sizeof(int64));
    value->v_bigint = sql_func_ntile_bucket(row_number.v_bigint, *group, *bucket_count);
    value->is_null = OG_FALSE;
    value->type = OG_TYPE_BIGINT;
    return OG_SUCCESS;
}

static inline status_t sql_winsort_ntile_actual(sql_stmt_t *stmt, sql_cursor_t *cursor, expr_node_t *node,
    variant_t *value)
{
    uint32 id = VALUE(uint32, &node->value);
    mtrl_rowid_t mtrl_rid;
    mtrl_row_t *row = NULL;

    row = &cursor->mtrl.cursor.row;
    if (row->lens[id] != sizeof(mtrl_rowid_t)) {
        OG_THROW_ERROR_EX(ERR_ASSERT_ERROR, "row->lens[id](%u) == sizeof(mtrl_rowid_t)(%u)", (uint32)row->lens[id],
            (uint32)sizeof(mtrl_rowid_t));
        return OG_ERROR;
    }
    mtrl_rid = *(mtrl_rowid_t *)(row->data + row->offsets[id]);
    return sql_win_aggr_get_ntile(stmt, cursor, &mtrl_rid, value);
}

static status_t sql_verify_winsort_sum(sql_verifier_t *verif, expr_node_t *winsort)
{
    uint32 excl_flags = verif->excl_flags;
    expr_node_t *func_node = winsort->argument->root;

    OG_BIT_SET(verif->excl_flags, SQL_EXCL_AGGR);

    OG_RETURN_IFERR(sql_verify_func_node(verif, func_node, 1, 1, OG_INVALID_ID32));

    verif->excl_flags = excl_flags;

    OG_RETURN_IFERR(opr_infer_type_sum(func_node->argument->root->datatype, &func_node->typmod));

    winsort->typmod = func_node->typmod;
    return OG_SUCCESS;
}

static inline status_t sql_winsort_calc_avg(sql_stmt_t *stmt, aggr_var_t *aggr_result)
{
    variant_t v_rows;

    if (aggr_result->var.is_null) {
        return OG_SUCCESS;
    }
    v_rows.type = OG_TYPE_BIGINT;
    v_rows.is_null = OG_FALSE;
    v_rows.v_bigint = (int64)GET_AGGR_VAR_AVG(aggr_result)->ex_avg_count;
    if (v_rows.v_bigint <= 0) {
        OG_THROW_ERROR_EX(ERR_ASSERT_ERROR, "v_rows.v_bigint(%lld) > 0", v_rows.v_bigint);
        return OG_ERROR;
    }
    return opr_exec(OPER_TYPE_DIV, SESSION_NLS(stmt), &aggr_result->var, &v_rows, &aggr_result->var);
}

static inline status_t og_winsort_sum_end(sql_stmt_t *statement, aggr_var_t *aggr_result)
{
    aggr_sum_t *data = get_aggr_var_sum(aggr_result);
    OG_RETVALUE_IFTRUE(data == NULL, OG_ERROR);
    if (data->has_distinct) {
        vm_hash_segment_deinit(&data->ex_hash_segment);
    }
    return OG_SUCCESS;
}

static inline status_t sql_winsort_count_end(sql_stmt_t *stmt, aggr_var_t *aggr_result)
{
    aggr_count_t *data = GET_AGGR_VAR_COUNT(aggr_result);
    OG_RETVALUE_IFTRUE(data == NULL, OG_ERROR);
    if (data->has_distinct) {
        vm_hash_segment_deinit(&data->ex_hash_segment);
    }

    if (aggr_result->var.is_null) {
        aggr_result->var.is_null = OG_FALSE;
        aggr_result->var.v_bigint = 0;
    }
    return OG_SUCCESS;
}

status_t sql_winsort_aggr_value_end(sql_stmt_t *stmt, sql_aggr_type_t aggr_type, sql_cursor_t *cursor,
    aggr_var_t *aggr_result)
{
    switch (aggr_type) {
        case AGGR_TYPE_STDDEV:
        case AGGR_TYPE_STDDEV_POP:
        case AGGR_TYPE_STDDEV_SAMP:
        case AGGR_TYPE_VARIANCE:
        case AGGR_TYPE_VAR_POP:
        case AGGR_TYPE_VAR_SAMP:
        case AGGR_TYPE_COVAR_POP:
        case AGGR_TYPE_COVAR_SAMP:
        case AGGR_TYPE_CORR:
            return sql_winsort_calc_aggr(stmt, aggr_type, aggr_result);

        case AGGR_TYPE_AVG:
            return sql_winsort_calc_avg(stmt, aggr_result);

        case AGGR_TYPE_COUNT:
            return sql_winsort_count_end(stmt, aggr_result);
        
        case AGGR_TYPE_SUM:
            return og_winsort_sum_end(stmt, aggr_result);

        default:
            return sql_win_aggr_append_data(stmt, cursor, aggr_result);
    }
}

status_t sql_winsort_get_aggr_type(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_aggr_type_t type, expr_tree_t *func_expr,
    og_type_t *datatype)
{
    variant_t value;
    if (*datatype != OG_TYPE_UNKNOWN) {
        return OG_SUCCESS;
    }

    switch (type) {
        case AGGR_TYPE_LAG:
        case AGGR_TYPE_AVG:
        case AGGR_TYPE_FIRST_VALUE:
        case AGGR_TYPE_LAST_VALUE:
        case AGGR_TYPE_MIN:
        case AGGR_TYPE_MAX:
            if (func_expr->root->argument->root->type != EXPR_NODE_GROUP) {
                OG_RETURN_IFERR(sql_exec_expr(stmt, func_expr->root->argument, &value));
                *datatype = value.type;
            } else {
                var_vm_col_t *v_vm_col = VALUE_PTR(var_vm_col_t, &func_expr->root->argument->root->value);
                og_type_t *types = (og_type_t *)((char *)cursor->mtrl.winsort_rs.buf + sizeof(uint32));
                *datatype = types[v_vm_col->id];
            }
            break;
        default:
            OG_THROW_ERROR(ERR_SQL_SYNTAX_ERROR, "cannot get aggregation datatype");
    }
    return OG_SUCCESS;
}

static status_t sql_func_winsort_aggr_default(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan,
    sql_aggr_type_t type, const char *buf)
{
    mtrl_row_t *row = NULL;
    aggr_var_t *old_aggr = NULL;
    mtrl_rowid_t mtrl_rid;
    aggr_var_t *aggr_result = NULL;
    bool32 grp_changed;
    bool32 ord_changed;
    status_t status = OG_ERROR;
    sql_cursor_t *query_cursor = OGSQL_CURR_CURSOR(stmt);
    uint32 rs_col_id = VALUE(uint32, &plan->winsort_p.winsort->value);
    expr_tree_t *func = plan->winsort_p.winsort->argument;
    og_type_t datatype = func->root->datatype;
    uint32 cmp_flag = (type == AGGR_TYPE_GROUP_CONCAT) ? WINSORT_PART : (WINSORT_PART | WINSORT_ORDER);

    OG_RETURN_IFERR(sql_winsort_get_aggr_type(stmt, cursor, type, func, &datatype));
    OG_RETURN_IFERR(sql_win_aggr_var_alloc(stmt, type, query_cursor, &aggr_result, datatype, &mtrl_rid, func));
    OG_RETURN_IFERR(mtrl_open_cursor(&stmt->mtrl, cursor->mtrl.winsort_sort.sid, &cursor->mtrl.cursor));
    OG_RETURN_IFERR(SQL_CURSOR_PUSH(stmt, cursor));

    for (;;) {
        SQL_CHECK_SESSION_VALID_FOR_RETURN(stmt);
        OG_BREAK_IF_ERROR(mtrl_fetch_winsort_rid(&stmt->mtrl, &cursor->mtrl.cursor, cmp_flag, &grp_changed,
            &ord_changed));
        if (cursor->mtrl.cursor.eof) {
            status = sql_winsort_aggr_value_end(stmt, type, query_cursor, aggr_result);
            break;
        }
        row = &cursor->mtrl.cursor.row;
        if (row->lens[rs_col_id] != sizeof(mtrl_rowid_t)) {
            OG_THROW_ERROR_EX(ERR_ASSERT_ERROR, "row->lens[rs_col_id](%u) == sizeof(mtrl_rowid_t)(%u)",
                (uint32)row->lens[rs_col_id], (uint32)sizeof(mtrl_rowid_t));
            break;
        }

        // save vmid to rs row, maybe more rs row has the same vmid
        *(mtrl_rowid_t *)(row->data + row->offsets[rs_col_id]) = mtrl_rid;

        OG_BREAK_IF_ERROR(sql_winsort_aggr_value(stmt, type, func, aggr_result, buf));

        if (grp_changed || ord_changed) {
            OGSQL_SAVE_STACK(stmt);
            if (ord_changed) {
                OG_BREAK_IF_ERROR(sql_stack_alloc_aggr_var(stmt, type, (void **)&old_aggr));
                OG_BREAK_IF_ERROR(sql_copy_aggr(type, aggr_result, old_aggr));
            }
            OG_BREAK_IF_ERROR(sql_winsort_aggr_value_end(stmt, type, query_cursor, aggr_result));
            // alloc next aggr row
            OG_BREAK_IF_ERROR(sql_win_aggr_var_alloc(stmt, type, query_cursor, &aggr_result, datatype, &mtrl_rid,
                func));
            if (ord_changed) {
                OG_BREAK_IF_ERROR(sql_copy_aggr(type, old_aggr, aggr_result));
            }
            OGSQL_RESTORE_STACK(stmt);
        }
    }
    SQL_CURSOR_POP(stmt);
    mtrl_close_cursor(&stmt->mtrl, &cursor->mtrl.cursor);
    return status;
}

static inline status_t sql_func_winsort_aggr(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan,
    sql_aggr_type_t type, const char *buffer)
{
    if (plan->winsort_p.winsort->win_args->windowing == NULL) {
        return sql_func_winsort_aggr_default(stmt, cursor, plan, type, buffer);
    }
    if (plan->winsort_p.winsort->win_args->windowing->is_range) {
        return sql_func_winsort_aggr_range(stmt, cursor, plan, type, buffer);
    }
    return sql_func_winsort_aggr_rows(stmt, cursor, plan, type, buffer);
}


static status_t sql_win_aggr_func_lag_offset(sql_stmt_t *stmt, expr_tree_t *arg_expr, int32 *aggr_offset)
{
    variant_t arg_variant;
    expr_node_t *node = NULL;
    int32 offset;

    node = arg_expr->root;
    if (node->type != EXPR_NODE_PARAM && node->type != EXPR_NODE_CONST) {
        OG_THROW_ERROR(ERR_INVALID_NUMBER, "-- not supprort the offset expr type");
        return OG_ERROR;
    }
    OG_RETURN_IFERR(sql_get_expr_node_value(stmt, node, &arg_variant));

    if (arg_variant.is_null == OG_TRUE) {
        OG_THROW_ERROR(ERR_INVALID_NUMBER, "-- text is empty or too long");
        return OG_ERROR;
    }

    if (var_as_floor_integer(&arg_variant) != OG_SUCCESS) {
        return OG_ERROR;
    }
    offset = VALUE(int32, &arg_variant);
    if (offset < 0) {
        OG_THROW_ERROR(ERR_FUNC_ARGUMENT_OUT_OF_RANGE, 2, "out of range");
        return OG_ERROR;
    }

    *(int32 *)aggr_offset = offset;
    return OG_SUCCESS;
}

static status_t sql_win_aggr_verify_ntile_bucket(sql_stmt_t *stmt, expr_tree_t *arg_expr, int64 *arg)
{
    variant_t arg_variant;
    expr_node_t *node = NULL;
    int64 bucket_count;

    node = arg_expr->root;
    OG_RETURN_IFERR(sql_get_expr_node_value(stmt, node, &arg_variant));

    if (arg_variant.is_null) {
        OG_THROW_ERROR(ERR_INVALID_NUMBER, "-- bucket count can not be null");
        return OG_ERROR;
    }

    if (arg_variant.type == OG_TYPE_BOOLEAN) {
        OG_THROW_ERROR(ERR_INVALID_NUMBER, "-- bucket type can not be bool");
        return OG_ERROR;
    }

    if (var_as_floor_bigint(&arg_variant) != OG_SUCCESS) {
        return OG_ERROR;
    }
    bucket_count = VALUE(int64, &arg_variant);
    if (bucket_count <= 0) {
        OG_THROW_ERROR(ERR_FUNC_ARGUMENT_OUT_OF_RANGE, 1, "out of range");
        return OG_ERROR;
    }

    if (arg != NULL) {
        *arg = bucket_count;
    }
    return OG_SUCCESS;
}

static status_t sql_func_winsort_init_slider(sql_stmt_t *stmt, sql_aggr_type_t type, sql_cursor_t *query_cursor,
    expr_tree_t *expr, winsort_slider_t *aggr_silder, expr_node_t *expr_node)
{
    mtrl_rowid_t mtrl_rid;
    aggr_var_t *aggr_result = NULL;

    OG_RETURN_IFERR(sql_win_aggr_alloc(stmt, type, query_cursor, &aggr_result, &mtrl_rid));
    aggr_result->var.is_null = OG_TRUE;

    aggr_silder->rid = mtrl_rid;
    if (expr != NULL) {
        aggr_result->var.type = expr->root->datatype;
        OG_RETURN_IFERR(sql_get_expr_node_value(stmt, expr->root, &aggr_result->var));
    } else {
        aggr_result->var.type = expr_node->datatype;
    }

    if (!aggr_result->var.is_null) {
        OG_RETURN_IFERR(sql_win_aggr_append_data(stmt, query_cursor, aggr_result));
    }

    return OG_SUCCESS;
}

status_t sql_win_aggr_var_alloc(sql_stmt_t *stmt, sql_aggr_type_t aggr_type, sql_cursor_t *cursor,
    aggr_var_t **aggr_var, og_type_t datatype, mtrl_rowid_t *rid, expr_tree_t *func_expr)
{
    if (aggr_type == AGGR_TYPE_COUNT) {
        OG_RETURN_IFERR(sql_win_aggr_count_alloc(stmt, aggr_type, func_expr, cursor, aggr_var, rid));
    } else if (aggr_type == AGGR_TYPE_SUM) {
        OG_RETURN_IFERR(sql_win_aggr_page_alloc(stmt, aggr_type, cursor, aggr_var, sizeof(aggr_sum_t), rid));
        OG_RETURN_IFERR(og_win_aggr_sum_alloc(stmt, func_expr, aggr_var));
    } else {
        OG_RETURN_IFERR(sql_win_aggr_alloc(stmt, aggr_type, cursor, aggr_var, rid));
    }
    (*aggr_var)->var.is_null = OG_TRUE;
    (*aggr_var)->var.type = datatype;
    return OG_SUCCESS;
}

static status_t sql_func_winsort_make_result(sql_stmt_t *stmt, sql_cursor_t *query_cursor, expr_tree_t *expr,
    winsort_slider_t *aggr_silder, sql_aggr_type_t type, const char *buf)
{
    mtrl_rowid_t rid;
    aggr_var_t *aggr_result = NULL;

    og_type_t datatype = expr->root->datatype;
    OG_RETURN_IFERR(sql_winsort_get_aggr_type(stmt, query_cursor, type, expr, &datatype));
    OG_RETURN_IFERR(sql_win_aggr_var_alloc(stmt, type, query_cursor, &aggr_result, datatype, &rid, NULL));
    aggr_silder->rid = rid;
    OG_RETURN_IFERR(sql_winsort_aggr_value(stmt, type, expr, aggr_result, buf));
    OG_RETURN_IFERR(sql_win_aggr_append_data(stmt, query_cursor, aggr_result));

    return OG_SUCCESS;
}

static status_t sql_func_winsort_check_arg(sql_stmt_t *stmt, expr_tree_t *root_expr, int32 *aggr_offset)
{
    int32 offset;
    expr_tree_t *offset_expr = NULL;
    offset_expr = root_expr->next;
    if (offset_expr != NULL) {
        OG_RETURN_IFERR(sql_win_aggr_func_lag_offset(stmt, offset_expr, &offset));
        *aggr_offset = offset;
    } else {
        *aggr_offset = 1;
    }

    return OG_SUCCESS;
}

static status_t sql_func_winsort_cume_dist_row(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan,
    sql_aggr_type_t type, const char *buf)
{
    mtrl_row_t *row = NULL;
    uint32 rs_col_id = VALUE(uint32, &plan->winsort_p.winsort->value);
    bool32 grp_changed;
    bool32 ord_changed;
    uint32 rnd = 0;
    sql_cursor_t *query_cursor = OGSQL_CURR_CURSOR(stmt);
    aggr_var_t *aggr_group = NULL;
    aggr_var_t *aggr_row_cume = NULL;
    mtrl_rowid_t rid_group;
    mtrl_rowid_t rid_row_cume;
    mtrl_rowid_t *rid_group_addr = NULL;
    vm_page_t *group_page = NULL;

    OG_RETURN_IFERR(sql_win_aggr_alloc(stmt, type, query_cursor, &aggr_group, &rid_group));
    OG_RETURN_IFERR(vm_open(stmt->mtrl.session, stmt->mtrl.pool, rid_group.vmid, &group_page));
    OG_RETURN_IFERR(sql_win_aggr_alloc(stmt, type, query_cursor, &aggr_row_cume, &rid_row_cume));
    aggr_group->var.is_null = OG_TRUE;
    aggr_group->var.type = OG_TYPE_BIGINT;
    aggr_group->var.v_bigint = 0;

    OG_RETURN_IFERR(mtrl_open_cursor(&stmt->mtrl, cursor->mtrl.winsort_sort.sid, &cursor->mtrl.cursor));
    OG_RETURN_IFERR(SQL_CURSOR_PUSH(stmt, cursor));

    for (;;) {
        SQL_CHECK_SESSION_VALID_FOR_RETURN(stmt);
        OG_RETURN_IFERR(mtrl_fetch_winsort_rid(&stmt->mtrl, &cursor->mtrl.cursor, WINSORT_PART | WINSORT_ORDER,
            &grp_changed, &ord_changed));
        if (cursor->mtrl.cursor.eof) {
            aggr_group->var.is_null = OG_FALSE;
            aggr_group->var.v_bigint = rnd;
            vm_close(stmt->mtrl.session, stmt->mtrl.pool, rid_group.vmid, VM_ENQUE_TAIL);
            break;
        }

        rnd++;

        row = &cursor->mtrl.cursor.row;
        if (row->lens[rs_col_id] != sizeof(mtrl_rowid_t)) {
            OG_THROW_ERROR_EX(ERR_ASSERT_ERROR, "row->lens[rs_col_id](%u) == sizeof(mtrl_rowid_t)",
                (uint32)row->lens[rs_col_id]);
            return OG_ERROR;
        }

        // this col set rid = rid_row_cume
        *(mtrl_rowid_t *)(row->data + row->offsets[rs_col_id]) = rid_row_cume;

        // set this col value : aggr_row_cume , and its offset to the rid_group
        aggr_row_cume->var.is_null = OG_FALSE;
        aggr_row_cume->var.type = OG_TYPE_BIGINT;
        aggr_row_cume->var.v_bigint = rnd;
        rid_group_addr = (mtrl_rowid_t *)(((char *)aggr_row_cume) + aggr_row_cume->extra_offset);
        *rid_group_addr = rid_group;

        if (grp_changed || ord_changed) {
            if (grp_changed) {
                aggr_group->var.is_null = OG_FALSE;
                aggr_group->var.v_bigint = rnd;
                rnd = 0;
                vm_close(stmt->mtrl.session, stmt->mtrl.pool, rid_group.vmid, VM_ENQUE_TAIL);
                OG_RETURN_IFERR(sql_win_aggr_alloc(stmt, type, query_cursor, &aggr_group, &rid_group));
                OG_RETURN_IFERR(vm_open(stmt->mtrl.session, stmt->mtrl.pool, rid_group.vmid, &group_page));
                aggr_group->var.is_null = OG_FALSE;
                aggr_group->var.type = OG_TYPE_BIGINT;
                aggr_group->var.v_bigint = 0;
            }
            OG_RETURN_IFERR(sql_win_aggr_alloc(stmt, type, query_cursor, &aggr_row_cume, &rid_row_cume));
            aggr_row_cume->var.is_null = OG_FALSE;
            aggr_row_cume->var.type = OG_TYPE_BIGINT;
            aggr_row_cume->var.v_bigint = 0;
        }
    }
    SQL_CURSOR_POP(stmt);
    mtrl_close_cursor(&stmt->mtrl, &cursor->mtrl.cursor);
    return OG_SUCCESS;
}

static status_t sql_func_winsort_lag_row(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan,
    sql_aggr_type_t type, const char *buf)
{
    status_t status = OG_ERROR;
    mtrl_row_t *row = NULL;
    winsort_slider_t *aggr_silder = NULL;
    winsort_slider_t *cur_slider = NULL;
    bool32 grp_changed;
    bool32 ord_changed;
    sql_cursor_t *query_cursor = OGSQL_CURR_CURSOR(stmt);
    uint32 rs_col_id = VALUE(uint32, &plan->winsort_p.winsort->value);
    int32 aggr_offset = 0;
    uint32 ind = 0;
    uint32 page_cnt = 0;
    uint32 map_idx = 0;
    uint32 *page_maps = NULL;
    uint32 page_offset;
    uint32 slider_seg_id;
    expr_tree_t *func_expr = plan->winsort_p.winsort->argument;
    expr_tree_t *root_expr = func_expr->root->argument;
    mtrl_rowid_t offset_rid;
    expr_tree_t *default_expr = root_expr->next == NULL ? NULL : root_expr->next->next;

    OG_RETURN_IFERR(sql_func_winsort_check_arg(stmt, root_expr, &aggr_offset));
    if (aggr_offset == 0) {
        return sql_func_winsort_aggr(stmt, cursor, plan, AGGR_TYPE_LAG, buf);
    }
    uint32 max_cnt = sql_win_page_max_count();
    // aggr-offset greater than 0 in check:sql_win_aggr_func_lag_offset
    uint32 act_cnt = MIN((uint32)aggr_offset, max_cnt);
    uint32 min_len = act_cnt * sizeof(winsort_slider_t);
    uint32 max_pages = CM_ALIGN8((uint32)aggr_offset / max_cnt + (uint32)1);
    if (max_pages > MAX_PAGE_LIST) {
        OG_THROW_ERROR(ERR_FUNC_ARGUMENT_OUT_OF_RANGE, 2, "out of range");
        return OG_ERROR;
    }

    OG_RETURN_IFERR(sql_push(stmt, max_pages * sizeof(uint32), (void **)&page_maps));
    OG_RETURN_IFERR(mtrl_open_cursor(&stmt->mtrl, cursor->mtrl.winsort_sort.sid, &cursor->mtrl.cursor));
    OG_RETURN_IFERR(SQL_CURSOR_PUSH(stmt, cursor));
    // winsort slider
    OG_RETURN_IFERR(mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_WINSORT_AGGR, NULL, &slider_seg_id));
    OG_RETURN_IFERR(mtrl_open_segment(&stmt->mtrl, slider_seg_id));
    OG_RETURN_IFERR(sql_win_slider_alloc(stmt, slider_seg_id, min_len, &offset_rid));
    page_maps[page_cnt++] = offset_rid.vmid;

    for (;;) {
        SQL_CHECK_SESSION_VALID_FOR_RETURN(stmt);
        OG_BREAK_IF_ERROR(mtrl_fetch_winsort_rid(&stmt->mtrl, &cursor->mtrl.cursor, WINSORT_PART | WINSORT_ORDER,
            &grp_changed, &ord_changed));
        if (cursor->mtrl.cursor.eof) {
            status = OG_SUCCESS;
            break;
        }

        row = &cursor->mtrl.cursor.row;
        if (row->lens[rs_col_id] != sizeof(mtrl_rowid_t)) {
            OG_THROW_ERROR_EX(ERR_ASSERT_ERROR, "row->lens[rs_col_id](%u) == sizeof(mtrl_rowid_t)(%u)",
                (uint32)row->lens[rs_col_id], (uint32)sizeof(mtrl_rowid_t));
            status = OG_ERROR;
            break;
        }

        get_page_and_offset(act_cnt, max_cnt, aggr_offset, ind, &map_idx, &page_offset);
        if (map_idx == page_cnt) {
            OG_RETURN_IFERR(sql_win_slider_alloc(stmt, slider_seg_id, min_len, &offset_rid));
            page_maps[page_cnt++] = offset_rid.vmid;
        }

        if (get_page_from_maps(stmt, slider_seg_id, (winsort_slider_t **)&aggr_silder, page_maps, map_idx) !=
            OG_SUCCESS) {
            OG_THROW_ERROR_EX(ERR_ASSERT_ERROR, "get page addr error ,  page_idx = %u", map_idx);
            status = OG_ERROR;
            break;
        }

        cur_slider = &aggr_silder[page_offset];
        if (ind < (uint32)aggr_offset) {
            OG_BREAK_IF_ERROR(
                sql_func_winsort_init_slider(stmt, type, query_cursor, default_expr, cur_slider, root_expr->root));
            *(mtrl_rowid_t *)(row->data + row->offsets[rs_col_id]) = cur_slider->rid;
        } else {
            *(mtrl_rowid_t *)(row->data + row->offsets[rs_col_id]) = cur_slider->rid;
        }

        OG_BREAK_IF_ERROR(sql_func_winsort_make_result(stmt, query_cursor, func_expr, cur_slider, type, buf));
        ind++;

        if (grp_changed) {
            ind = 0;
        }
    }

    SQL_CURSOR_POP(stmt);
    cm_pop(stmt->session->stack);
    sql_winsort_release_pages(stmt, slider_seg_id, page_maps, page_cnt);
    mtrl_close_cursor(&stmt->mtrl, &cursor->mtrl.cursor);
    return status;
}

static status_t sql_func_winsort_ntile_core(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan,
    sql_aggr_type_t type)
{
    mtrl_row_t *row = NULL;
    uint32 rs_col_id = VALUE(uint32, &plan->winsort_p.winsort->value);
    bool32 grp_changed = OG_FALSE;
    bool32 ord_changed = OG_FALSE;
    sql_cursor_t *query_cursor = OGSQL_CURR_CURSOR(stmt);
    int64 *group_row = NULL;
    char *aggr_row_number = NULL;
    mtrl_rowid_t rid_group;
    mtrl_rowid_t rid_row_num;
    mtrl_rowid_t *rid_group_addr = NULL;
    int64 group = 0;
    int64 bucket;
    bool32 need_verify = OG_TRUE;
    expr_tree_t *arg = plan->winsort_p.winsort->argument->root->argument;

    OG_RETURN_IFERR(mtrl_win_aggr_alloc(&stmt->mtrl, query_cursor->mtrl.winsort_aggr.sid, (void **)&group_row,
        sizeof(int64) + sizeof(int64), &rid_group, OG_TRUE));
    MEMS_RETURN_IFERR(memset_s(group_row, sizeof(int64) + sizeof(int64), 0, sizeof(int64) + sizeof(int64)));
    OG_RETURN_IFERR(mtrl_open_cursor(&stmt->mtrl, cursor->mtrl.winsort_sort.sid, &cursor->mtrl.cursor));
    OG_RETURN_IFERR(SQL_CURSOR_PUSH(stmt, cursor));

    for (;;) {
        SQL_CHECK_SESSION_VALID_FOR_RETURN(stmt);
        OG_RETURN_IFERR(sql_win_aggr_alloc(stmt, type, query_cursor, (aggr_var_t **)&aggr_row_number, &rid_row_num));
        OG_RETURN_IFERR(
            mtrl_fetch_winsort_rid(&stmt->mtrl, &cursor->mtrl.cursor, WINSORT_PART, &grp_changed, &ord_changed));

        if (cursor->mtrl.cursor.eof) {
            OG_RETURN_IFERR(
                sql_win_aggr_ntile_set(stmt, &rid_group, &group, &bucket, query_cursor->mtrl.winsort_aggr.sid));
            break;
        }

        if (need_verify) {
            OG_RETURN_IFERR(sql_win_aggr_verify_ntile_bucket(stmt, arg, &bucket));
            need_verify = OG_FALSE;
        }

        row = &cursor->mtrl.cursor.row;
        if (row->lens[rs_col_id] != sizeof(mtrl_rowid_t)) {
            OG_THROW_ERROR_EX(ERR_ASSERT_ERROR, "row->lens[rs_col_id](%u) == sizeof(mtrl_rowid_t)",
                (uint32)row->lens[rs_col_id]);
            return OG_ERROR;
        }
        *(mtrl_rowid_t *)(row->data + row->offsets[rs_col_id]) = rid_row_num;

        group++;
        ((aggr_var_t *)aggr_row_number)->var.type = OG_TYPE_BIGINT;
        ((aggr_var_t *)aggr_row_number)->var.v_bigint = group;
        rid_group_addr = (mtrl_rowid_t *)(aggr_row_number + ((aggr_var_t *)aggr_row_number)->extra_offset);

        *rid_group_addr = rid_group;

        if (grp_changed) {
            need_verify = OG_TRUE;
            OG_RETURN_IFERR(
                sql_win_aggr_ntile_set(stmt, &rid_group, &group, &bucket, query_cursor->mtrl.winsort_aggr.sid));
            OG_RETURN_IFERR(mtrl_win_aggr_alloc(&stmt->mtrl, query_cursor->mtrl.winsort_aggr.sid, (void **)&group_row,
                sizeof(int64) + sizeof(int64), &rid_group, OG_TRUE));
            MEMS_RETURN_IFERR(memset_s(group_row, sizeof(int64) + sizeof(int64), 0, sizeof(int64) + sizeof(int64)));
            group = 0;
        }
    }
    SQL_CURSOR_POP(stmt);
    mtrl_close_cursor(&stmt->mtrl, &cursor->mtrl.cursor);
    return OG_SUCCESS;
}

static status_t sql_func_winsort_sum(sql_stmt_t *stmt, sql_cursor_t *cur, plan_node_t *plan)
{
    return sql_func_winsort_aggr(stmt, cur, plan, AGGR_TYPE_SUM, NULL);
}

static status_t sql_func_winsort_stddev(sql_stmt_t *stmt, sql_cursor_t *cur, plan_node_t *plan)
{
    return sql_func_winsort_aggr(stmt, cur, plan, AGGR_TYPE_STDDEV, NULL);
}

static status_t sql_func_winsort_stddev_pop(sql_stmt_t *stmt, sql_cursor_t *cur, plan_node_t *plan)
{
    return sql_func_winsort_aggr(stmt, cur, plan, AGGR_TYPE_STDDEV_POP, NULL);
}

static status_t sql_func_winsort_stddev_samp(sql_stmt_t *stmt, sql_cursor_t *cur, plan_node_t *plan)
{
    return sql_func_winsort_aggr(stmt, cur, plan, AGGR_TYPE_STDDEV_SAMP, NULL);
}

static status_t sql_func_winsort_variance(sql_stmt_t *stmt, sql_cursor_t *cur, plan_node_t *plan)
{
    return sql_func_winsort_aggr(stmt, cur, plan, AGGR_TYPE_VARIANCE, NULL);
}

static status_t sql_func_winsort_var_pop(sql_stmt_t *stmt, sql_cursor_t *cur, plan_node_t *plan)
{
    return sql_func_winsort_aggr(stmt, cur, plan, AGGR_TYPE_VAR_POP, NULL);
}

static status_t sql_func_winsort_var_samp(sql_stmt_t *stmt, sql_cursor_t *cur, plan_node_t *plan)
{
    return sql_func_winsort_aggr(stmt, cur, plan, AGGR_TYPE_VAR_SAMP, NULL);
}

static status_t sql_func_winsort_covar_pop(sql_stmt_t *stmt, sql_cursor_t *cur, plan_node_t *plan)
{
    return sql_func_winsort_aggr(stmt, cur, plan, AGGR_TYPE_COVAR_POP, NULL);
}

static status_t sql_func_winsort_covar_samp(sql_stmt_t *stmt, sql_cursor_t *cur, plan_node_t *plan)
{
    return sql_func_winsort_aggr(stmt, cur, plan, AGGR_TYPE_COVAR_SAMP, NULL);
}

static status_t sql_func_winsort_corr(sql_stmt_t *stmt, sql_cursor_t *cur, plan_node_t *plan)
{
    return sql_func_winsort_aggr(stmt, cur, plan, AGGR_TYPE_CORR, NULL);
}

static status_t sql_verify_winsort_covar_or_corr(sql_verifier_t *verif, expr_node_t *winsort)
{
    uint32 excl_flags = verif->excl_flags;
    expr_node_t *func_node = winsort->argument->root;

    OG_BIT_SET(verif->excl_flags, SQL_EXCL_AGGR);

    OG_RETURN_IFERR(sql_verify_func_node(verif, func_node, 2, 2, OG_INVALID_ID32));
    if (func_node->dis_info.need_distinct) {
        OG_SRC_THROW_ERROR(func_node->argument->loc, ERR_SQL_SYNTAX_ERROR,
            "DISTINCT option not allowed for this function");
        return OG_ERROR;
    }

    verif->excl_flags = excl_flags;
    func_node->datatype = func_node->argument->root->datatype;
    winsort->datatype = OG_TYPE_NUMBER;
    winsort->size = OG_MAX_DEC_OUTPUT_ALL_PREC;
    return OG_SUCCESS;
}

static status_t sql_verify_winsort_min_max(sql_verifier_t *verif, expr_node_t *winsort)
{
    uint32 excl_flags = verif->excl_flags;
    expr_node_t *func_node = winsort->argument->root;

    OG_BIT_SET(verif->excl_flags, SQL_EXCL_AGGR);

    OG_RETURN_IFERR(sql_verify_func_node(verif, func_node, 1, 1, OG_INVALID_ID32));

    verif->excl_flags = excl_flags;
    func_node->datatype = func_node->argument->root->datatype;
    winsort->datatype = func_node->datatype;
    winsort->size = (uint16)cm_get_datatype_strlen(func_node->datatype, func_node->argument->root->size);
    return OG_SUCCESS;
}

static status_t sql_verify_winsort_first_last_value(sql_verifier_t *verif, expr_node_t *winsort)
{
    return sql_verify_winsort_min_max(verif, winsort);
}

static status_t sql_clone_within_group_args(void *ogx, expr_node_t *src_args, winsort_args_t **dst_args,
    ga_alloc_func_t sql_alloc_mem)
{
    if (src_args->sort_items != NULL) {
        sort_item_t *item = NULL;
        sort_item_t *new_item = NULL;

        OG_RETURN_IFERR(sql_alloc_mem(ogx, sizeof(galist_t), (void **)&(*dst_args)->sort_items));
        cm_galist_init((*dst_args)->sort_items, ogx, sql_alloc_mem);
        for (uint32 i = 0; i < src_args->sort_items->count; i++) {
            item = (sort_item_t *)cm_galist_get(src_args->sort_items, i);
            OG_RETURN_IFERR(cm_galist_new((*dst_args)->sort_items, sizeof(sort_item_t), (void **)&new_item));
            OG_RETURN_IFERR(sql_clone_expr_tree(ogx, item->expr, &new_item->expr, sql_alloc_mem));
            new_item->direction = item->direction;
            new_item->nulls_pos = item->nulls_pos;
            new_item->sort_mode = item->sort_mode;
        }
        (*dst_args)->sort_columns += src_args->sort_items->count;
    }
    return OG_SUCCESS;
}

static status_t sql_verify_winsort_listagg(sql_verifier_t *verif, expr_node_t *winsort)
{
    uint32 excl_flags = verif->excl_flags;
    expr_node_t *func = winsort->argument->root;

    OG_BIT_SET(verif->excl_flags, SQL_EXCL_AGGR);
    OG_RETURN_IFERR(sql_verify_listagg(verif, func));
    if (winsort->win_args->sort_items != NULL) {
        OG_SRC_THROW_ERROR_EX(func->argument->loc, ERR_SQL_SYNTAX_ERROR, " over of %s does not allow order by ",
            T2S(&func->word.func.name));
        return OG_ERROR;
    }
    OG_RETURN_IFERR(sql_clone_within_group_args(verif->stmt->context, func, &winsort->win_args, sql_alloc_mem));
    verif->excl_flags = excl_flags;
    winsort->datatype = func->datatype;
    winsort->size = (uint16)cm_get_datatype_strlen(func->datatype, func->size);
    return OG_SUCCESS;
}

static inline bool8 sql_node_bigint_is_int32(expr_node_t *node)
{
    return node->datatype == OG_TYPE_BIGINT && node->type == EXPR_NODE_CONST &&
        node->value.v_bigint <= (int64)OG_MAX_INT32 && node->value.v_bigint >= (int64)OG_MIN_INT32;
}

static status_t sql_verify_winsort_lag_row(sql_verifier_t *verif, expr_node_t *winsort)
{
    uint32 excl_flags = verif->excl_flags;
    expr_node_t *func_node = winsort->argument->root;
    expr_tree_t *default_expr = NULL;
    expr_node_t *default_root = NULL;
    uint16 default_expr_len = 0;
    text_buf_t buffer;
    char *buf = NULL;

    if (winsort->win_args->windowing != NULL) {
        OG_SRC_THROW_ERROR(func_node->loc, ERR_UNSUPPORT_FUNC, "Windowing clause is", "in function LEAD or LAG");
        return OG_ERROR;
    }

    OG_BIT_SET(verif->excl_flags, SQL_EXCL_AGGR);
    OG_RETURN_IFERR(sql_verify_func_node(verif, func_node, 1, 3, OG_INVALID_ID32));

    verif->excl_flags = excl_flags;
    func_node->datatype = func_node->argument->root->datatype;
    winsort->datatype = func_node->datatype;
    winsort->size = (uint16)cm_get_datatype_strlen(func_node->datatype, func_node->argument->root->size);

    if ((func_node->value.v_func.arg_cnt == 3)) {
        default_expr = func_node->argument->next->next;
        if ((OG_IS_STRING_TYPE(func_node->datatype))) {
            default_expr_len = cm_get_datatype_strlen(default_expr->root->datatype, default_expr->root->size);
            if (sql_node_bigint_is_int32(default_expr->root)) {
                default_expr_len = OG_MAX_INT32_STRLEN;
            }

            if (default_expr_len > winsort->size) {
                OG_THROW_ERROR(ERR_SQL_SYNTAX_ERROR, "the length of default value as string is larger");
                return OG_ERROR;
            }
        }

        default_root = default_expr->root;
        if (default_root->type == EXPR_NODE_CONST) {
            OGSQL_SAVE_STACK(verif->stmt);
            if (sql_push(verif->stmt, OG_STRING_BUFFER_SIZE, (void **)&buf) != OG_SUCCESS) {
                OGSQL_RESTORE_STACK(verif->stmt);
                return OG_ERROR;
            }

            CM_INIT_TEXTBUF(&buffer, OG_STRING_BUFFER_SIZE, buf);
            if (var_convert(SESSION_NLS(verif->stmt), &default_root->value, winsort->datatype, (text_buf_t *)&buffer) !=
                OG_SUCCESS) {
                OG_THROW_ERROR(ERR_INVALID_DATA_TYPE, "--invalid datatype");
                OGSQL_RESTORE_STACK(verif->stmt);
                return OG_ERROR;
            }
            OGSQL_RESTORE_STACK(verif->stmt);
        }
    }

    return OG_SUCCESS;
}

static status_t sql_verify_winsort_lead_row(sql_verifier_t *verif, expr_node_t *winsort)
{
    return sql_verify_winsort_lag_row(verif, winsort);
}

static status_t sql_verify_winsort_ntile(sql_verifier_t *verif, expr_node_t *winsort)
{
    uint32 excl_flags = verif->excl_flags;
    expr_node_t *func_node = winsort->argument->root;
    expr_tree_t *root_node = func_node->argument;

    if (winsort->win_args->sort_items == NULL) {
        OG_THROW_ERROR(ERR_NO_ORDER_BY_CLAUSE, "no order by clause");
        return OG_ERROR;
    }
    if (winsort->win_args->windowing != NULL) {
        OG_SRC_THROW_ERROR(func_node->loc, ERR_UNSUPPORT_FUNC, "Windowing clause is", "in function NTILE");
        return OG_ERROR;
    }

    OG_BIT_SET(verif->excl_flags, SQL_EXCL_AGGR | SQL_EXCL_ROWID | SQL_EXCL_ROWSCN | SQL_EXCL_ARRAY);
    if (sql_verify_func_node(verif, func_node, 1, 1, OG_INVALID_ID32) != OG_SUCCESS) {
        return OG_ERROR;
    }

    verif->excl_flags = excl_flags;
    func_node->datatype = func_node->argument->root->datatype;
    if (root_node->root->type == EXPR_NODE_CONST) {
        OG_RETURN_IFERR(sql_win_aggr_verify_ntile_bucket(verif->stmt, root_node, NULL));
    }
    winsort->datatype = OG_TYPE_BIGINT;
    winsort->size = (uint16)sizeof(int64);
    return OG_SUCCESS;
}

static status_t sql_func_winsort_first_value(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    char *buffer = NULL;
    status_t status;

    OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buffer));
    status = sql_func_winsort_aggr(stmt, cursor, plan, AGGR_TYPE_FIRST_VALUE, buffer);
    cm_pop(stmt->session->stack);
    return status;
}

static status_t sql_func_winsort_last_value(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    char *buffer = NULL;
    status_t status;

    OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buffer));
    status = sql_func_winsort_aggr(stmt, cursor, plan, AGGR_TYPE_LAST_VALUE, buffer);
    cm_pop(stmt->session->stack);
    return status;
}

static status_t sql_func_winsort_max(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    char *buffer = NULL;
    status_t status;

    OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buffer));
    status = sql_func_winsort_aggr(stmt, cursor, plan, AGGR_TYPE_MAX, buffer);
    cm_pop(stmt->session->stack);
    return status;
}

static status_t sql_func_winsort_cume_dist(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    char *buffer = NULL;
    status_t status;

    OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buffer));
    status = sql_func_winsort_cume_dist_row(stmt, cursor, plan, AGGR_TYPE_CUME_DIST, buffer);
    cm_pop(stmt->session->stack);
    return status;
}

static status_t sql_func_winsort_lag(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    char *buffer = NULL;
    status_t status;

    OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buffer));
    status = sql_func_winsort_lag_row(stmt, cursor, plan, AGGR_TYPE_LAG, buffer);
    cm_pop(stmt->session->stack);
    return status;
}

static status_t sql_func_winsort_lead(sql_stmt_t *stmt, sql_cursor_t *cur, plan_node_t *plan)
{
    return sql_func_winsort_lag(stmt, cur, plan);
}

static status_t sql_func_winsort_listagg(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    char *buffer = NULL;
    status_t status;

    OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buffer));
    status = sql_func_winsort_aggr(stmt, cursor, plan, AGGR_TYPE_GROUP_CONCAT, buffer);
    cm_pop(stmt->session->stack);
    return status;
}

static status_t sql_func_winsort_min(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    char *buffer = NULL;
    status_t status;

    OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buffer));
    status = sql_func_winsort_aggr(stmt, cursor, plan, AGGR_TYPE_MIN, buffer);
    cm_pop(stmt->session->stack);
    return status;
}

static status_t sql_func_winsort_ntile(sql_stmt_t *stmt, sql_cursor_t *cur, plan_node_t *plan)
{
    return sql_func_winsort_ntile_core(stmt, cur, plan, AGGR_TYPE_NTILE);
}

static inline void sql_winsort_aggr_default(variant_t *value)
{
    value->type = OG_TYPE_VM_ROWID;
    value->is_null = OG_FALSE;
    value->v_vmid.vmid = 0;
    value->v_vmid.slot = 0;
}

static inline status_t sql_winsort_aggr_actual(sql_stmt_t *stmt, sql_cursor_t *cursor, expr_node_t *node,
    variant_t *value)
{
    mtrl_rowid_t rid;
    mtrl_row_t *row = &cursor->mtrl.cursor.row;
    uint32 id = VALUE(uint32, &node->value);
    if (row->lens[id] != sizeof(mtrl_rowid_t)) {
        OG_THROW_ERROR_EX(ERR_ASSERT_ERROR, "row->lens[id](%u) == sizeof(mtrl_rowid_t)(%u)", (uint32)row->lens[id],
            (uint32)sizeof(mtrl_rowid_t));
        return OG_ERROR;
    }
    rid = *(mtrl_rowid_t *)(row->data + row->offsets[id]);

    return sql_win_aggr_get(stmt, cursor, &rid, value, cursor->mtrl.winsort_aggr.sid);
}

#define WINSORT_RESULT_HAS_EXTRA_SIZE(row)                                                                  \
    (((aggr_var_t *)(row))->extra_offset != 0 && ((aggr_var_t *)(row))->extra_offset < OG_VMEM_PAGE_SIZE && \
        ((aggr_var_t *)(row))->extra_size > 0)

status_t sql_win_aggr_get(sql_stmt_t *stmt, sql_cursor_t *cursor, mtrl_rowid_t *rid, variant_t *value, uint32 seg_id)
{
    char *row = NULL;
    mtrl_rowid_t next_vid;
    variant_t result;

    if (mtrl_win_aggr_get(&stmt->mtrl, seg_id, (char **)&row, rid, OG_TRUE) != OG_SUCCESS) {
        return OG_ERROR;
    }

    *value = ((aggr_var_t *)row)->var;
    switch (value->type) {
        case OG_TYPE_VM_ROWID:
            next_vid = value->v_vmid;
            return sql_win_aggr_get(stmt, cursor, &next_vid, value, cursor->mtrl.winsort_aggr_ext.sid);

        case OG_TYPE_CHAR:
        case OG_TYPE_VARCHAR:
        case OG_TYPE_STRING:
            result.v_text.str = row + sizeof(aggr_var_t);
            if (WINSORT_RESULT_HAS_EXTRA_SIZE(row)) {
                result.v_text.str += ((aggr_var_t *)row)->extra_size;
            }
            if (value->v_text.len != 0) {
                if (sql_push(stmt, value->v_text.len, (void **)&value->v_text.str) != OG_SUCCESS) {
                    return OG_ERROR;
                }
                MEMS_RETURN_IFERR(memcpy_s(value->v_text.str, value->v_text.len, result.v_text.str, value->v_text.len));
            } else {
                value->v_text.str = NULL;
            }

            return OG_SUCCESS;

        case OG_TYPE_BINARY:
        case OG_TYPE_VARBINARY:
        case OG_TYPE_RAW:
            result.v_bin.bytes = (uint8 *)row + sizeof(aggr_var_t);
            if (WINSORT_RESULT_HAS_EXTRA_SIZE(row)) {
                result.v_bin.bytes += ((aggr_var_t *)row)->extra_size;
            }
            if (value->v_bin.size != 0) {
                if (sql_push(stmt, value->v_bin.size, (void **)&value->v_bin.bytes) != OG_SUCCESS) {
                    return OG_ERROR;
                }
                MEMS_RETURN_IFERR(
                    memcpy_s(value->v_bin.bytes, value->v_bin.size, result.v_bin.bytes, value->v_bin.size));
            } else {
                value->v_bin.bytes = NULL;
            }
        // fall through
        default:
            return OG_SUCCESS;
    }
}

static inline status_t sql_win_aggr_cume_dist_get(sql_stmt_t *stmt, mtrl_rowid_t *rid, variant_t *value, uint32 seg_id)
{
    char *row = NULL;
    char *grp = NULL;
    aggr_var_t *temp = NULL;
    mtrl_rowid_t extra_rid;
    uint32 row_cume;
    uint32 group;

    if (mtrl_win_aggr_get(&stmt->mtrl, seg_id, (char **)&row, rid, OG_TRUE) != OG_SUCCESS) {
        return OG_ERROR;
    }

    temp = (aggr_var_t *)row;
    row_cume = temp->var.v_uint32;

    extra_rid = *((mtrl_rowid_t *)(row + temp->extra_offset));
    if (mtrl_win_aggr_get(&stmt->mtrl, seg_id, (char **)&grp, &extra_rid, OG_TRUE) != OG_SUCCESS) {
        return OG_ERROR;
    }

    group = ((aggr_var_t *)grp)->var.v_uint32;

    value->is_null = OG_FALSE;
    value->type = OG_TYPE_REAL;
    value->v_real = ((double)row_cume) / group;

    return OG_SUCCESS;
}

static inline status_t sql_winsort_cume_dist_actual(sql_stmt_t *stmt, sql_cursor_t *cursor, expr_node_t *node,
    variant_t *value)
{
    mtrl_rowid_t rid;
    mtrl_row_t *row = &cursor->mtrl.cursor.row;
    uint32 id = VALUE(uint32, &node->value);
    if (row->lens[id] != sizeof(mtrl_rowid_t)) {
        OG_THROW_ERROR_EX(ERR_ASSERT_ERROR, "row->lens[id](%u) == sizeof(mtrl_rowid_t)(%u)", (uint32)row->lens[id],
            (uint32)sizeof(mtrl_rowid_t));
        return OG_ERROR;
    }
    rid = *(mtrl_rowid_t *)(row->data + row->offsets[id]);

    return sql_win_aggr_cume_dist_get(stmt, &rid, value, cursor->mtrl.winsort_aggr.sid);
}

static status_t sql_verify_winsort_avg(sql_verifier_t *verif, expr_node_t *winsort)
{
    uint32 excl_flags = verif->excl_flags;
    expr_node_t *func_node = winsort->argument->root;

    OG_BIT_SET(verif->excl_flags, SQL_EXCL_AGGR);

    OG_RETURN_IFERR(sql_verify_avg(verif, func_node));

    verif->excl_flags = excl_flags;
    winsort->typmod = func_node->typmod;
    return OG_SUCCESS;
}

static status_t sql_verify_winsort_cume_dist(sql_verifier_t *verif, expr_node_t *winsort)
{
    uint32 excl_flags = verif->excl_flags;
    expr_node_t *func_node = winsort->argument->root;

    if (winsort->win_args->windowing != NULL) {
        OG_SRC_THROW_ERROR(func_node->loc, ERR_UNSUPPORT_FUNC, "Windowing clause is", "in function CUME_DIST");
        return OG_ERROR;
    }

    OG_BIT_SET(verif->excl_flags, SQL_EXCL_AGGR);
    OG_RETURN_IFERR(sql_verify_func_node(verif, func_node, 0, 0, OG_INVALID_ID32));

    winsort->datatype = OG_TYPE_REAL;
    winsort->size = OG_REAL_SIZE;

    func_node->datatype = OG_TYPE_REAL;
    func_node->size = OG_REAL_SIZE;

    verif->excl_flags = excl_flags;
    winsort->typmod = func_node->typmod;
    return OG_SUCCESS;
}

static status_t sql_func_winsort_avg(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    return sql_func_winsort_aggr(stmt, cursor, plan, AGGR_TYPE_AVG, NULL);
}

static status_t sql_verify_winsort_stddev(sql_verifier_t *verif, expr_node_t *winsort)
{
    uint32 excl_flags = verif->excl_flags;
    expr_node_t *func_node = winsort->argument->root;

    OG_BIT_SET(verif->excl_flags, SQL_EXCL_AGGR);

    OG_RETURN_IFERR(sql_verify_func_node(verif, func_node, 1, 1, OG_INVALID_ID32));

    verif->excl_flags = excl_flags;
    winsort->datatype = OG_TYPE_NUMBER;
    winsort->size = OG_MAX_DEC_OUTPUT_ALL_PREC;
    return OG_SUCCESS;
}

static status_t sql_func_winsort_count(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    return sql_func_winsort_aggr(stmt, cursor, plan, AGGR_TYPE_COUNT, NULL);
}

static status_t sql_verify_winsort_count(sql_verifier_t *verif, expr_node_t *winsort)
{
    uint32 excl_flags = verif->excl_flags;
    expr_node_t *func_node = winsort->argument->root;

    OG_BIT_SET(verif->excl_flags, SQL_EXCL_AGGR);

    OG_RETURN_IFERR(sql_verify_count(verif, func_node));

    verif->excl_flags = excl_flags;
    winsort->typmod = func_node->typmod;
    return OG_SUCCESS;
}

static winsort_func_t g_winsort_funcs[] = {
    { { (char *)"avg", 3 }, sql_func_winsort_avg, sql_verify_winsort_avg, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"corr", 4 }, sql_func_winsort_corr, sql_verify_winsort_covar_or_corr, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"count", 5 }, sql_func_winsort_count, sql_verify_winsort_count, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"covar_pop", 9 }, sql_func_winsort_covar_pop, sql_verify_winsort_covar_or_corr, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"covar_samp", 10 }, sql_func_winsort_covar_samp, sql_verify_winsort_covar_or_corr, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"cume_dist", 9 }, sql_func_winsort_cume_dist, sql_verify_winsort_cume_dist, sql_winsort_aggr_default, sql_winsort_cume_dist_actual },
    { { (char *)"dense_rank", 10 }, sql_func_winsort_dense_rank, sql_verify_winsort_row_number, sql_winsort_row_number_default, sql_winsort_row_number_actual },
    { { (char *)"first_value", 11 }, sql_func_winsort_first_value, sql_verify_winsort_first_last_value, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"lag", 3 }, sql_func_winsort_lag, sql_verify_winsort_lag_row, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"last_value", 10 }, sql_func_winsort_last_value, sql_verify_winsort_first_last_value, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"lead", 4 }, sql_func_winsort_lead, sql_verify_winsort_lead_row, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"listagg", 7 }, sql_func_winsort_listagg, sql_verify_winsort_listagg, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"max", 3 }, sql_func_winsort_max, sql_verify_winsort_min_max, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"min", 3 }, sql_func_winsort_min, sql_verify_winsort_min_max, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"ntile", 5 }, sql_func_winsort_ntile, sql_verify_winsort_ntile, sql_winsort_aggr_default, sql_winsort_ntile_actual },
    { { (char *)"rank", 4 }, sql_func_winsort_rank, sql_verify_winsort_row_number, sql_winsort_row_number_default, sql_winsort_row_number_actual },
    { { (char *)"row_number", 10 }, sql_func_winsort_row_number, sql_verify_winsort_row_number, sql_winsort_row_number_default, sql_winsort_row_number_actual },
    { { (char *)"stddev", 6 }, sql_func_winsort_stddev, sql_verify_winsort_stddev, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"stddev_pop", 10 }, sql_func_winsort_stddev_pop, sql_verify_winsort_stddev, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"stddev_samp", 11 }, sql_func_winsort_stddev_samp, sql_verify_winsort_stddev, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"sum", 3 }, sql_func_winsort_sum, sql_verify_winsort_sum, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"variance", 8 }, sql_func_winsort_variance, sql_verify_winsort_stddev, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"var_pop", 7 }, sql_func_winsort_var_pop, sql_verify_winsort_stddev, sql_winsort_aggr_default, sql_winsort_aggr_actual },
    { { (char *)"var_samp", 8 }, sql_func_winsort_var_samp, sql_verify_winsort_stddev, sql_winsort_aggr_default, sql_winsort_aggr_actual }
};

#define SQL_WINSORT_FUNC_COUNT (sizeof(g_winsort_funcs) / sizeof(winsort_func_t))

static text_t *sql_winsort_func_name(void *set, uint32 id)
{
    return &g_winsort_funcs[id].name;
}

winsort_func_t *sql_get_winsort_func(var_func_t *v_func)
{
    return &g_winsort_funcs[v_func->func_id];
}

void sql_winsort_func_binsearch(sql_text_t *func_name, var_func_t *v)
{
    v->func_id = sql_func_binsearch((text_t *)func_name, sql_winsort_func_name, NULL, SQL_WINSORT_FUNC_COUNT);
    v->is_winsort_func = OG_TRUE;
}

status_t sql_get_winsort_func_id(sql_text_t *func_name, var_func_t *v)
{
    sql_winsort_func_binsearch(func_name, v);
    if (v->func_id == OG_INVALID_ID32) {
        OG_THROW_ERROR_EX(ERR_SQL_SYNTAX_ERROR, "WINSORT function does not support %s function",
            T2S((text_t *)func_name));
        return OG_ERROR;
    }
    return OG_SUCCESS;
}

status_t sql_get_winsort_value(sql_stmt_t *stmt, expr_node_t *node, variant_t *value)
{
    expr_node_t *func_node = node->argument->root;
    sql_cursor_t *query_cursor = OGSQL_CURR_CURSOR(stmt);
    winsort_func_t *func = sql_get_winsort_func(&func_node->value.v_func);

    if (query_cursor->winsort_ready == OG_FALSE) {
        func->default_val(value);
        return OG_SUCCESS;
    }
    return func->actual_val(stmt, query_cursor, node, value);
}

status_t sql_fetch_winsort(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, bool32 *eof)
{
    if (mtrl_fetch_sort(&stmt->mtrl, &cursor->mtrl.cursor) != OG_SUCCESS) {
        return OG_ERROR;
    }
    cursor->mtrl.cursor.type = MTRL_CURSOR_WINSORT;
    *eof = cursor->mtrl.cursor.eof;
    return OG_SUCCESS;
}

static inline status_t sql_push_winsort_pairs(sql_cursor_t *cursor, plan_node_t *plan, bool32 need_free,
    winsort_assist_t *assist)
{
    if (assist->count >= MAX_WINSORT_SIZE) {
        OG_THROW_ERROR(ERR_COLUM_LIST_EXCEED, MAX_WINSORT_SIZE);
        return OG_ERROR;
    }
    assist->pairs[assist->count].plan = plan;
    assist->pairs[assist->count].cursor = cursor;
    assist->pairs[assist->count].need_free = need_free;
    assist->count++;
    return OG_SUCCESS;
}

static inline void sql_free_winsort_pairs(sql_stmt_t *stmt, winsort_assist_t *assist)
{
    for (uint32 i = 0; i < assist->count; i++) {
        if (assist->pairs[i].cursor != NULL && assist->pairs[i].need_free) {
            sql_free_cursor(stmt, assist->pairs[i].cursor);
        }
    }
}

static inline status_t sql_mtrl_winsort_pair(sql_stmt_t *stmt, mtrl_rowid_t *rs_rid, winsort_pair_t *pair)
{
    char *buffer = NULL;
    mtrl_rowid_t rid;
    plan_node_t *plan = pair->plan;
    sql_cursor_t *cursor = pair->cursor;

    OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buffer));
    if (sql_make_mtrl_winsort_row(stmt, plan->winsort_p.winsort->win_args, rs_rid, buffer,
        cursor->mtrl.winsort_sort.buf) != OG_SUCCESS) {
        OGSQL_POP(stmt);
        return OG_ERROR;
    }
    if (mtrl_insert_row(&stmt->mtrl, cursor->mtrl.winsort_sort.sid, buffer, &rid) != OG_SUCCESS) {
        OGSQL_POP(stmt);
        return OG_ERROR;
    }
    OGSQL_POP(stmt);
    return OG_SUCCESS;
}

static inline status_t sql_mtrl_winsort_pairs(sql_stmt_t *stmt, mtrl_rowid_t *rs_rid, winsort_assist_t *assist)
{
    for (uint32 i = 0; i < assist->count; i++) {
        OG_RETURN_IFERR(sql_mtrl_winsort_pair(stmt, rs_rid, &assist->pairs[i]));
    }
    return OG_SUCCESS;
}

static inline void sql_winsort_close_pairs_segment(sql_stmt_t *stmt, winsort_assist_t *assist, uint32 count)
{
    winsort_pair_t *pair = NULL;
    for (uint32 i = 0; i < count; i++) {
        pair = &assist->pairs[i];
        mtrl_close_segment(&stmt->mtrl, pair->cursor->mtrl.winsort_sort.sid);
    }
}


static status_t sql_winsort_mtrl_window_types(sql_stmt_t *stmt, sql_cursor_t *cursor, mtrl_resource_t *sort,
    winsort_args_t *args)
{
    windowing_args_t *windowing = args->windowing;
    uint32 type_cnt = 0;
    uint32 mem_cost_size;
    char **buffer = &sort->buf;
    og_type_t *types = NULL;
    if (windowing == NULL) {
        return OG_SUCCESS;
    }
    bool32 need_ltype = windowing->l_expr != NULL && !TREE_IS_CONST(windowing->l_expr);
    bool32 need_rtype = windowing->r_expr != NULL && !TREE_IS_CONST(windowing->r_expr);
    if (need_ltype) {
        type_cnt++;
    }
    if (need_rtype) {
        type_cnt++;
    }
    if (type_cnt == 0) {
        return OG_SUCCESS;
    }

    if (*buffer == NULL) {
        if (args->group_exprs != NULL) {
            type_cnt += args->group_exprs->count;
        }
        if (args->sort_items != NULL) {
            type_cnt += args->sort_items->count;
        }
        mem_cost_size = sizeof(uint32) + type_cnt * sizeof(og_type_t);
        OG_RETURN_IFERR(vmc_alloc(&cursor->vmc, mem_cost_size, (void **)buffer));
        *(uint32 *)*buffer = mem_cost_size;
    }
    types = (og_type_t *)(*buffer + PENDING_HEAD_SIZE);
    if (need_rtype) {
        types[--type_cnt] = windowing->r_expr->root->datatype;
    }
    if (need_ltype) {
        types[--type_cnt] = windowing->l_expr->root->datatype;
    }

    stmt->mtrl.segments[sort->sid]->pending_type_buf = sort->buf;
    return OG_SUCCESS;
}

static inline status_t sql_start_mtrl_winsort_pairs(sql_stmt_t *stmt, winsort_assist_t *assist)
{
    uint32 loop;
    winsort_pair_t *pair = NULL;

    for (loop = 0; loop < assist->count; loop++) {
        pair = &assist->pairs[loop];
        OG_BREAK_IF_ERROR(mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_WINSORT,
            (handle_t)pair->plan->winsort_p.winsort->win_args, &pair->cursor->mtrl.winsort_sort.sid));
        OG_BREAK_IF_ERROR(mtrl_open_segment(&stmt->mtrl, pair->cursor->mtrl.winsort_sort.sid));
        stmt->mtrl.segments[pair->cursor->mtrl.winsort_sort.sid]->cmp_flag = (WINSORT_PART | WINSORT_ORDER);
        OG_RETURN_IFERR(sql_winsort_mtrl_window_types(stmt, pair->cursor, &pair->cursor->mtrl.winsort_sort,
            pair->plan->winsort_p.winsort->win_args));
    }

    if (loop != assist->count) {
        sql_winsort_close_pairs_segment(stmt, assist, loop);
        return OG_ERROR;
    }
    return OG_SUCCESS;
}

static inline void sql_win_pair_reset_pending_buf(mtrl_context_t *mtrl_ctx, mtrl_resource_t *winsort)
{
    if (mtrl_ctx->segments[winsort->sid]->pending_type_buf == NULL) {
        mtrl_ctx->segments[winsort->sid]->pending_type_buf = winsort->buf;
    }
}

static inline status_t sql_end_mtrl_winsort_pairs(sql_stmt_t *stmt, winsort_assist_t *assist)
{
    status_t status = OG_SUCCESS;
    winsort_pair_t *pair = NULL;

    for (uint32 i = 0; i < assist->count; i++) {
        pair = &assist->pairs[i];
        mtrl_close_segment(&stmt->mtrl, pair->cursor->mtrl.winsort_sort.sid);
        if (status == OG_SUCCESS && mtrl_sort_segment(&stmt->mtrl, pair->cursor->mtrl.winsort_sort.sid) != OG_SUCCESS) {
            status = OG_ERROR;
        }
        sql_win_pair_reset_pending_buf(&stmt->mtrl, &pair->cursor->mtrl.winsort_sort);
    }
    return status;
}

static status_t sql_winsort_mtrl_rs_record_types(sql_stmt_t *stmt, sql_cursor_t *cursor, galist_t *winsort_rs_columns)
{
    rs_column_t *rs_col = NULL;
    uint32 mem_cost_size;
    og_type_t *types = NULL;
    char **buf = &cursor->mtrl.winsort_rs.buf;

    if (*buf == NULL) {
        mem_cost_size = sizeof(uint32) + winsort_rs_columns->count * sizeof(og_type_t);
        OG_RETURN_IFERR(vmc_alloc(&cursor->vmc, mem_cost_size, (void **)buf));
        *(uint32 *)*buf = mem_cost_size;
    }
    types = (og_type_t *)(*buf + PENDING_HEAD_SIZE);
    for (uint32 i = 0; i < winsort_rs_columns->count; i++) {
        rs_col = (rs_column_t *)cm_galist_get(winsort_rs_columns, i);
        types[i] = rs_col->datatype;
    }
    stmt->mtrl.segments[cursor->mtrl.winsort_rs.sid]->pending_type_buf = cursor->mtrl.winsort_rs.buf;
    return OG_SUCCESS;
}

static status_t sql_prepare_winsort_rs(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan,
    galist_t *winsort_rs_columns, winsort_assist_t *assist)
{
    status_t status = OG_ERROR;
    bool32 eof = OG_FALSE;
    char *buf = NULL;
    mtrl_rowid_t rid;

    // winsort aggr
    OG_RETURN_IFERR(mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_WINSORT_AGGR, NULL, &cursor->mtrl.winsort_aggr.sid));
    OG_RETURN_IFERR(mtrl_open_segment(&stmt->mtrl, cursor->mtrl.winsort_aggr.sid));
    // winsort aggr_ext
    OG_RETURN_IFERR(
        mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_WINSORT_AGGR, NULL, &cursor->mtrl.winsort_aggr_ext.sid));
    OG_RETURN_IFERR(mtrl_open_segment(&stmt->mtrl, cursor->mtrl.winsort_aggr_ext.sid));
    // winsort rs
    OG_RETURN_IFERR(mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_WINSORT_RS, NULL, &cursor->mtrl.winsort_rs.sid));
    OG_RETURN_IFERR(mtrl_open_segment(&stmt->mtrl, cursor->mtrl.winsort_rs.sid));
    if (cursor->select_ctx != NULL && cursor->select_ctx->pending_col_count > 0) {
        OG_RETURN_IFERR(sql_winsort_mtrl_rs_record_types(stmt, cursor, winsort_rs_columns));
    }

    // prepare mtrl segment
    if (sql_start_mtrl_winsort_pairs(stmt, assist) != OG_SUCCESS) {
        mtrl_close_segment(&stmt->mtrl, cursor->mtrl.winsort_rs.sid);
        return OG_ERROR;
    }

    if (sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buf) != OG_SUCCESS) {
        mtrl_close_segment(&stmt->mtrl, cursor->mtrl.winsort_rs.sid);
        sql_winsort_close_pairs_segment(stmt, assist, assist->count);
        return OG_ERROR;
    }

    OGSQL_SAVE_STACK(stmt);
    for (;;) {
        OG_BREAK_IF_ERROR(sql_fetch_query(stmt, cursor, plan, &eof));

        if (eof) {
            status = OG_SUCCESS;
            break;
        }

        OG_BREAK_IF_ERROR(sql_make_mtrl_rs_row(stmt, cursor->mtrl.winsort_rs.buf, winsort_rs_columns, buf));
        OG_BREAK_IF_ERROR(mtrl_insert_row(&stmt->mtrl, cursor->mtrl.winsort_rs.sid, buf, &rid));
        OG_BREAK_IF_ERROR(sql_mtrl_winsort_pairs(stmt, &rid, assist));

        OGSQL_RESTORE_STACK(stmt);
    }
    OGSQL_RESTORE_STACK(stmt);
    OGSQL_POP(stmt);
    mtrl_close_segment(&stmt->mtrl, cursor->mtrl.winsort_rs.sid);
    if (status != OG_SUCCESS) {
        sql_winsort_close_pairs_segment(stmt, assist, assist->count);
        return OG_ERROR;
    }
    return sql_end_mtrl_winsort_pairs(stmt, assist);
}

static status_t sql_execute_winsort_plan(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan_node,
    galist_t *winsort_rs_columns, winsort_assist_t *assist)
{
    sql_cursor_t *sub_cursor = NULL;
    plan_node_t *plan = plan_node;

    while (plan->type == PLAN_NODE_WINDOW_SORT) {
        if (plan->winsort_p.winsort->win_args->is_rs_node) {
            OG_RETURN_IFERR(sql_push_winsort_pairs(cursor, plan, OG_FALSE, assist));
            plan = plan->winsort_p.next;
            continue;
        }
        OG_RETURN_IFERR(sql_alloc_cursor(stmt, &sub_cursor));
        sql_init_ssa_cursor_maps(sub_cursor, OG_MAX_SUBSELECT_EXPRS);
        sub_cursor->is_open = OG_TRUE;
        sub_cursor->scn = cursor->scn;
        sub_cursor->ancestor_ref = cursor->ancestor_ref;
        OG_RETURN_IFERR(sql_push_winsort_pairs(sub_cursor, plan, OG_TRUE, assist));
        plan = plan->winsort_p.next;
    }
    OG_RETURN_IFERR(sql_execute_query_plan(stmt, cursor, plan));
    if (cursor->eof) {
        return OG_SUCCESS;
    }

    return sql_prepare_winsort_rs(stmt, cursor, plan, winsort_rs_columns, assist);
}

static status_t sql_invoke_winsort_func(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    expr_node_t *func_node = NULL;
    winsort_func_t *func = NULL;
    func_node = plan->winsort_p.winsort->argument->root;
    func = sql_get_winsort_func(&func_node->value.v_func);
    return func->invoke(stmt, cursor, plan);
}

status_t sql_execute_winsort(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    winsort_pair_t *pair = NULL;
    winsort_assist_t ass;
    galist_t *winsort_rs_columns = plan->winsort_p.rs_columns;

    ass.count = 0;

    if (sql_execute_winsort_plan(stmt, cursor, plan, winsort_rs_columns, &ass) != OG_SUCCESS) {
        sql_free_winsort_pairs(stmt, &ass);
        return OG_ERROR;
    }

    if (cursor->eof) {
        sql_free_winsort_pairs(stmt, &ass);
        return OG_SUCCESS;
    }

    for (uint32 i = 0; i < ass.count; i++) {
        pair = &ass.pairs[i];

        if (sql_invoke_winsort_func(stmt, pair->cursor, pair->plan) != OG_SUCCESS) {
            sql_free_winsort_pairs(stmt, &ass);
            return OG_ERROR;
        }
        if (pair->need_free) {
            sql_free_cursor(stmt, pair->cursor);
            pair->cursor = NULL;
        }
    }

    OG_RETURN_IFERR(mtrl_open_cursor(&stmt->mtrl, cursor->mtrl.winsort_sort.sid, &cursor->mtrl.cursor));
    cursor->mtrl.cursor.type = MTRL_CURSOR_WINSORT;
    cursor->winsort_ready = OG_TRUE;
    return OG_SUCCESS;
}

status_t sql_send_sort_row_filter(sql_stmt_t *stmt, sql_cursor_t *cursor, bool32 *is_full)
{
    uint32 i;
    rs_column_t *rs_column = NULL;
    variant_t rs_var;
    uint32 count = cursor->columns->count;

    OG_RETURN_IFERR(my_sender(stmt)->send_row_begin(stmt, count));
    for (i = 0; i < count; i++) {
        rs_column = (rs_column_t *)cm_galist_get(cursor->columns, i);
        OG_RETURN_IFERR(sql_exec_expr(stmt, rs_column->expr, &rs_var));
        if (sql_send_value(stmt, cursor->mtrl.rs.buf, rs_column->datatype, &rs_column->typmod, &rs_var) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }
    OG_RETURN_IFERR(my_sender(stmt)->send_row_end(stmt, is_full));
    sql_inc_rows(stmt, cursor);
    return OG_SUCCESS;
}