/* -------------------------------------------------------------------------
 *  This file is part of the oGRAC project.
 * Copyright (c) 2024 Huawei Technologies Co.,Ltd.
 *
 * oGRAC is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * ogsql_distinct.c
 *
 *
 * IDENTIFICATION
 * src/ogsql/executor/ogsql_distinct.c
 *
 * -------------------------------------------------------------------------
 */
#include "ogsql_distinct.h"
#include "ogsql_select.h"
#include "ogsql_mtrl.h"
#include "knl_mtrl.h"
#include "srv_instance.h"

static inline status_t distinct_hash_oper_func(void *callback_ctx, const char *new_buf, uint32 new_size,
    const char *old_buf, uint32 old_size, bool32 found)
{
    char *row_buf = NULL;
    sql_cursor_t *cursor = (sql_cursor_t *)callback_ctx;

    if (!found) {
        row_buf = cursor->mtrl.cursor.distinct.row.data;
        MEMS_RETURN_IFERR(memcpy_s(row_buf, OG_MAX_ROW_SIZE, old_buf, old_size));

        mtrl_cursor_t *mtrl_cursor = &cursor->mtrl.cursor;
        mtrl_cursor->distinct.eof = OG_FALSE;
        mtrl_cursor->eof = OG_FALSE;
        mtrl_hash_distinct_cursor_t *hash_distinct_cursor = &mtrl_cursor->distinct;
        cm_decode_row(hash_distinct_cursor->row.data, hash_distinct_cursor->row.offsets, hash_distinct_cursor->row.lens,
            NULL);
        mtrl_cursor->type = MTRL_CURSOR_HASH_DISTINCT;
    }
    return OG_SUCCESS;
}

status_t sql_execute_hash_distinct(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    status_t status = OG_ERROR;
#ifdef TIME_STATISTIC
    clock_t start;
    double timeuse;
    start = cm_cal_time_bengin();
#endif

    do {
        OG_BREAK_IF_ERROR(sql_execute_query_plan(stmt, cursor, plan->distinct.next));
        if (cursor->eof) {
            status = OG_SUCCESS;
            break;
        }

        OG_BREAK_IF_ERROR(mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_DISTINCT, NULL, &cursor->mtrl.distinct));
        OG_BREAK_IF_ERROR(sql_alloc_distinct_ctx(stmt, cursor, plan, HASH_DISTINCT));
        cursor->mtrl.cursor.distinct.eof = OG_FALSE;
        status = OG_SUCCESS;
    } while (0);

#ifdef TIME_STATISTIC
    timeuse = cm_cal_time_end(start);
    stmt->mt_time += timeuse;
#endif
    return status;
}

status_t sql_fetch_hash_distinct(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, bool32 *eof)
{
    bool32 exist_row = OG_FALSE;
    char *buf = NULL;
    hash_segment_t *hash_segment = NULL;
    hash_table_entry_t *hash_table_entry = NULL;

    if (cursor->eof) {
        *eof = OG_TRUE;
        cursor->mtrl.cursor.distinct.eof = OG_TRUE;
        cursor->mtrl.cursor.type = MTRL_CURSOR_HASH_DISTINCT;
        return OG_SUCCESS;
    }

    OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buf));

    for (;;) {
        OGSQL_SAVE_STACK(stmt);
        if (sql_fetch_query(stmt, cursor, plan->distinct.next, eof) != OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            OGSQL_POP(stmt);
            return OG_ERROR;
        }

        if (*eof) {
            OGSQL_RESTORE_STACK(stmt);
            OGSQL_POP(stmt);
            cursor->mtrl.cursor.distinct.eof = OG_TRUE;
            cursor->mtrl.cursor.type = MTRL_CURSOR_HASH_DISTINCT;
            return OG_SUCCESS;
        }

        if (sql_make_mtrl_rs_row(stmt, cursor->mtrl.rs.buf, plan->distinct.columns, buf) != OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            OGSQL_POP(stmt);
            return OG_ERROR;
        }
        hash_segment = &cursor->distinct_ctx->hash_segment;
        hash_table_entry = &cursor->distinct_ctx->hash_table_entry;
        if (vm_hash_table_insert2(&exist_row, hash_segment, hash_table_entry, buf, ((row_head_t *)buf)->size) !=
            OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            OGSQL_POP(stmt);
            return OG_ERROR;
        }
        OGSQL_RESTORE_STACK(stmt);

        if (!exist_row) {
            break;
        }
    }

    OGSQL_POP(stmt);
    return OG_SUCCESS;
}

static status_t sql_get_distinct_index_row_buf(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, bool32 *eof,
    char **next_row_buf)
{
    mtrl_segment_t *segment = stmt->mtrl.segments[cursor->mtrl.index_distinct];
    mtrl_page_t *page = (mtrl_page_t *)segment->curr_page->data;
    bool32 *flag = (bool32 *)((char *)page + page->free_begin);
    char *row_buf1 = (char *)page + page->free_begin + sizeof(bool32);
    char *row_buf2 = row_buf1 + OG_MAX_ROW_SIZE;
    mtrl_cursor_t *mtrl_cursor = &cursor->mtrl.cursor;

    if (*flag) {
        *flag = OG_FALSE;
        OG_RETURN_IFERR(sql_fetch_query(stmt, cursor, plan->distinct.next, eof));
        if (*eof) {
            return OG_SUCCESS;
        }
        OG_RETURN_IFERR(sql_make_mtrl_rs_row(stmt, cursor->mtrl.rs.buf, plan->distinct.columns, row_buf1));
        mtrl_cursor->row.data = row_buf1;
        *next_row_buf = row_buf2;
    } else {
        if (mtrl_cursor->row.data == row_buf1) {
            mtrl_cursor->row.data = row_buf2;
            *next_row_buf = row_buf1;
        } else {
            mtrl_cursor->row.data = row_buf1;
            *next_row_buf = row_buf2;
        }
        *eof = IS_INVALID_ROW(mtrl_cursor->row.data);
        if (*eof) {
            return OG_SUCCESS;
        }
    }
    cm_decode_row(mtrl_cursor->row.data, mtrl_cursor->row.offsets, mtrl_cursor->row.lens, NULL);
    return OG_SUCCESS;
}

status_t sql_fetch_index_distinct(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, bool32 *eof)
{
    int32 result;
    bool32 distinct_eof = OG_FALSE;
    char *next_row_buf = NULL;
    mtrl_cursor_t *mtrl_cursor = &cursor->mtrl.cursor;
    mtrl_segment_t *segment = stmt->mtrl.segments[cursor->mtrl.index_distinct];

    if (cursor->eof) {
        *eof = OG_TRUE;
        return OG_SUCCESS;
    }

    OG_RETURN_IFERR(sql_get_distinct_index_row_buf(stmt, cursor, plan, eof, &next_row_buf));
    if (*eof) {
        return OG_SUCCESS;
    }

    for (;;) {
        OG_RETURN_IFERR(sql_fetch_query(stmt, cursor, plan->distinct.next, &distinct_eof));
        if (distinct_eof) {
            CM_SET_INVALID_ROW(next_row_buf);
            break;
        }
        OG_RETURN_IFERR(sql_make_mtrl_rs_row(stmt, cursor->mtrl.rs.buf, plan->distinct.columns, next_row_buf));
        OG_RETURN_IFERR(sql_mtrl_sort_cmp(segment, mtrl_cursor->row.data, next_row_buf, &result));
        if (result != 0) {
            break;
        }
    }
    return OG_SUCCESS;
}

status_t sql_execute_index_distinct(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    OG_RETURN_IFERR(sql_execute_query_plan(stmt, cursor, plan->distinct.next));
    if (cursor->eof) {
        return OG_SUCCESS;
    }

    OG_RETURN_IFERR(
        mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_DISTINCT, plan->distinct.columns, &cursor->mtrl.index_distinct));
    OG_RETURN_IFERR(mtrl_open_segment(&stmt->mtrl, cursor->mtrl.index_distinct));

    mtrl_page_t *page = (mtrl_page_t *)stmt->mtrl.segments[cursor->mtrl.index_distinct]->curr_page->data;
    if (page->free_begin + 2 * OG_MAX_ROW_SIZE + sizeof(bool32) > OG_VMEM_PAGE_SIZE) {
        OG_THROW_ERROR(ERR_NO_FREE_VMEM, "one page free size is smaller than needed memory");
        return OG_ERROR;
    }
    *(bool32 *)((char *)page + page->free_begin) = OG_TRUE;
    cursor->mtrl.cursor.type = MTRL_CURSOR_OTHERS;
    return OG_SUCCESS;
}

static inline og_type_t sql_get_rs_col_datatype(galist_t *rs_columns, uint32 col_id, char *pending_buf)
{
    rs_column_t *rs_col = (rs_column_t *)cm_galist_get(rs_columns, col_id);

    if (rs_col->datatype != OG_TYPE_UNKNOWN) {
        return rs_col->datatype;
    }

    return sql_get_pending_type(pending_buf, col_id);
}

static inline int32 sql_sort_distinct_cmp_g(galist_t *rs_columns, mtrl_row_t *row1, mtrl_row_t *row2, uint32 col_id,
    char *pending_buf, const order_mode_t *order_mode)
{
    og_type_t datatype = sql_get_rs_col_datatype(rs_columns, col_id, pending_buf);
    return sql_sort_mtrl_rows(row1, row2, col_id, datatype, order_mode);
}

static inline int32 sql_sort_distinct_cmp_i(galist_t *rs_columns, mtrl_row_t *row1, mtrl_row_t *row2, uint32 col_id,
    char *pending_buf)
{
    og_type_t datatype = sql_get_rs_col_datatype(rs_columns, col_id, pending_buf);
    return sql_compare_data_ex(MT_CDATA(row1, col_id), MT_CSIZE(row1, col_id), MT_CDATA(row2, col_id),
        MT_CSIZE(row2, col_id), datatype);
}

static status_t sql_sort_distinct_cmp(int32 *result, void *callback_ctx, char *lbuf, uint32 lsize, char *rbuf,
    uint32 rsize)
{
    mtrl_row_t row1;
    mtrl_row_t row2;
    btree_cmp_key_t *btree_cmp_key = NULL;
    btree_sort_key_t *btree_sort_key = NULL;
    sql_cursor_t *cursor = (sql_cursor_t *)callback_ctx;
    distinct_ctx_t *distinct_ctx = cursor->distinct_ctx;
    galist_t *rs_columns = distinct_ctx->distinct_p->columns;
    btree_sort_t *btree_sort = distinct_ctx->distinct_p->btree_sort;

    row1.data = lbuf;
    cm_decode_row(lbuf, row1.offsets, row1.lens, NULL);
    row2.data = rbuf;
    cm_decode_row(rbuf, row2.offsets, row2.lens, NULL);

    for (uint32 i = 0; i < btree_sort->sort_key.count; ++i) {
        btree_sort_key = (btree_sort_key_t *)cm_galist_get(&btree_sort->sort_key, i);
        *result = sql_sort_distinct_cmp_g(rs_columns, &row1, &row2, btree_sort_key->group_id, cursor->mtrl.rs.buf,
            &btree_sort_key->sort_mode);
        if (*result != 0) {
            return OG_SUCCESS;
        }
    }

    for (uint32 i = 0; i < btree_sort->cmp_key.count; ++i) {
        btree_cmp_key = (btree_cmp_key_t *)cm_galist_get(&btree_sort->cmp_key, i);
        *result = sql_sort_distinct_cmp_i(rs_columns, &row1, &row2, btree_cmp_key->group_id, cursor->mtrl.rs.buf);
        if (*result != 0) {
            return OG_SUCCESS;
        }
    }
    return OG_SUCCESS;
}

void sql_free_distinct_ctx(distinct_ctx_t *distinct_ctx)
{
    if (distinct_ctx->type == SORT_DISTINCT) {
        sql_btree_deinit(&distinct_ctx->btree_seg);
    } else {
        vm_hash_segment_deinit(&distinct_ctx->hash_segment);
    }
}

status_t sql_alloc_distinct_ctx(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, distinct_type_t type)
{
    OG_RETURN_IFERR(vmc_alloc(&cursor->vmc, sizeof(distinct_ctx_t), (void **)&cursor->distinct_ctx));
    cursor->distinct_ctx->type = type;

    if (type == SORT_DISTINCT) {
        cursor->distinct_ctx->distinct_p = &plan->distinct;
        return sql_btree_init(&cursor->distinct_ctx->btree_seg, stmt->session, stmt->session->knl_session.temp_pool,
            cursor, sql_sort_distinct_cmp, NULL);
    }

    hash_segment_t *hash_segment = &cursor->distinct_ctx->hash_segment;
    vm_hash_segment_init(&stmt->session->knl_session, stmt->mtrl.pool, hash_segment, PMA_POOL, HASH_PAGES_HOLD,
        HASH_AREA_SIZE);

    hash_table_entry_t *hash_table_entry = &cursor->distinct_ctx->hash_table_entry;
    uint32 bucket_num = sql_get_plan_hash_rows(stmt, plan);
    if (stmt->context->hash_bucket_size != 0) {
        bucket_num = stmt->context->hash_bucket_size;
    }
    if (vm_hash_table_alloc(hash_table_entry, hash_segment, bucket_num) != OG_SUCCESS) {
        vm_hash_segment_deinit(hash_segment);
        return OG_ERROR;
    }

    if (vm_hash_table_init(hash_segment, hash_table_entry, distinct_hash_oper_func, NULL, cursor) != OG_SUCCESS) {
        vm_hash_segment_deinit(hash_segment);
        return OG_ERROR;
    }

    mtrl_segment_t *segment = stmt->mtrl.segments[cursor->mtrl.distinct];
    uint32 vm_id = segment->vm_list.last;
    if (mtrl_open_page(&stmt->mtrl, vm_id, &segment->curr_page) != OG_SUCCESS) {
        return OG_ERROR;
    }

    uint32 column_count = (type == HASH_DISTINCT) ? plan->distinct.columns->count : cursor->columns->count;
    OG_RETURN_IFERR(
        vmc_alloc(&cursor->vmc, column_count * sizeof(uint16), (void **)&cursor->mtrl.cursor.distinct.row.lens));
    OG_RETURN_IFERR(
        vmc_alloc(&cursor->vmc, column_count * sizeof(uint16), (void **)&cursor->mtrl.cursor.distinct.row.offsets));

    cursor->mtrl.cursor.distinct.row.data = segment->curr_page->data;
    return OG_SUCCESS;
}

static status_t sql_mtrl_sort_distinct(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    bool32 eof = OG_FALSE;
    char *buf = NULL;
    uint32 key_size;
    status_t status = OG_ERROR;
    distinct_ctx_t *distinct_ctx = cursor->distinct_ctx;

    OG_RETURN_IFERR(SQL_CURSOR_PUSH(stmt, cursor));
    OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buf));
    OGSQL_SAVE_STACK(stmt);

    for (;;) {
        OG_BREAK_IF_ERROR(sql_fetch_query(stmt, cursor, plan->distinct.next, &eof));
        if (eof) {
            status = OG_SUCCESS;
            break;
        }

        OG_BREAK_IF_ERROR(sql_make_mtrl_rs_row(stmt, cursor->mtrl.rs.buf, plan->distinct.columns, buf));

        key_size = ((row_head_t *)buf)->size;
        OG_BREAK_IF_ERROR(sql_btree_insert(&distinct_ctx->btree_seg, buf, key_size, key_size));

        OGSQL_RESTORE_STACK(stmt);
    }
    OGSQL_RESTORE_STACK(stmt);
    OGSQL_POP(stmt);
    SQL_CURSOR_POP(stmt);
    OG_RETURN_IFERR(sql_free_query_mtrl(stmt, cursor, plan->distinct.next));
    return status;
}

status_t sql_execute_sort_distinct(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
    OG_RETURN_IFERR(sql_execute_query_plan(stmt, cursor, plan->distinct.next));
    if (cursor->eof) {
        return OG_SUCCESS;
    }

    OG_RETURN_IFERR(sql_alloc_distinct_ctx(stmt, cursor, plan, SORT_DISTINCT));
    OG_RETURN_IFERR(sql_mtrl_sort_distinct(stmt, cursor, plan));
    if (cursor->eof) {
        return OG_SUCCESS;
    }

    distinct_ctx_t *distinct_ctx = cursor->distinct_ctx;
    return sql_btree_open(&distinct_ctx->btree_seg, &distinct_ctx->btree_cursor);
}

status_t sql_fetch_sort_distinct(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, bool32 *eof)
{
    sql_btree_row_t *btree_row = NULL;
    mtrl_cursor_t *mtrl_cursor = &cursor->mtrl.cursor;
    distinct_ctx_t *distinct_ctx = cursor->distinct_ctx;

    OG_RETURN_IFERR(sql_btree_fetch(&distinct_ctx->btree_seg, &distinct_ctx->btree_cursor, eof));
    mtrl_cursor->type = MTRL_CURSOR_OTHERS;
    if (*eof) {
        return OG_SUCCESS;
    }

    btree_row = distinct_ctx->btree_cursor.btree_row;
    mtrl_cursor->eof = OG_FALSE;
    mtrl_cursor->row.data = btree_row->data;
    cm_decode_row(mtrl_cursor->row.data, mtrl_cursor->row.offsets, mtrl_cursor->row.lens, NULL);
    return OG_SUCCESS;
}