* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* ogsql_sort.c
*
*
* IDENTIFICATION
* src/ogsql/executor/ogsql_sort.c
*
* -------------------------------------------------------------------------
*/
#include "ogsql_sort.h"
#include "ogsql_mtrl.h"
#include "ogsql_select.h"
#include "srv_instance.h"
#include "knl_mtrl.h"
status_t sql_fetch_sort(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, bool32 *eof)
{
if (mtrl_fetch_sort(&stmt->mtrl, &cursor->mtrl.cursor) != OG_SUCCESS) {
return OG_ERROR;
}
*eof = cursor->mtrl.cursor.eof;
return OG_SUCCESS;
}
static status_t sql_cmp_with_top_row(mtrl_context_t *ogx, uint32 seg_id, mtrl_rowid_list_t *rowid_list, char *sort_row,
bool32 *candidate)
{
*candidate = OG_TRUE;
if (rowid_list->row_cnt < rowid_list->max_row) {
return OG_SUCCESS;
}
mtrl_segment_t *seg = ogx->segments[seg_id];
mtrl_rowid_t top_rid = rowid_list->rid[rowid_list->max_row - 1];
vm_page_t *tmp_page = seg->curr_page;
mtrl_page_t *page = NULL;
char *cmp_row = NULL;
int32 res;
status_t status;
if (tmp_page->vmid != top_rid.vmid) {
OG_RETURN_IFERR(vm_open(ogx->session, ogx->pool, top_rid.vmid, &tmp_page));
}
page = (mtrl_page_t *)tmp_page->data;
cmp_row = MTRL_GET_ROW(page, top_rid.slot);
status = ogx->sort_cmp(seg, sort_row, cmp_row, &res);
if (status == OG_SUCCESS && res >= 0) {
*candidate = OG_FALSE;
}
if (tmp_page->vmid != seg->curr_page->vmid) {
vm_close(ogx->session, ogx->pool, tmp_page->vmid, VM_ENQUE_TAIL);
}
return status;
}
static status_t sql_make_query_sort_row(char *sort_buf, mtrl_rowid_t rid)
{
row_head_t *row = (row_head_t *)sort_buf;
if (row->size + sizeof(mtrl_rowid_t) > OG_MAX_ROW_SIZE) {
OG_THROW_ERROR(ERR_EXCEED_MAX_ROW_SIZE, row->size + sizeof(mtrl_rowid_t), OG_MAX_ROW_SIZE);
return OG_ERROR;
}
*(mtrl_rowid_t *)(sort_buf + row->size) = rid;
row->size += sizeof(mtrl_rowid_t);
return OG_SUCCESS;
}
static status_t mtrl_get_sort_pos(mtrl_context_t *ogx, mtrl_segment_t *segment, mtrl_rowid_list_t *rowid_list, char
*row,
uint32 *pos)
{
if (rowid_list->row_cnt == 0) {
*pos = 0;
return OG_SUCCESS;
}
uint32 begin = 0;
uint32 end = rowid_list->row_cnt - 1;
uint32 curr = 0;
mtrl_rowid_t cmp_rid = rowid_list->rid[end];
int32 res;
vm_page_t *temp_page = NULL;
mtrl_page_t *page = NULL;
OG_RETURN_IFERR(vm_open(ogx->session, ogx->pool, cmp_rid.vmid, &temp_page));
page = (mtrl_page_t *)temp_page->data;
char *cmp_row = MTRL_GET_ROW(page, cmp_rid.slot);
if (ogx->sort_cmp(segment, row, cmp_row, &res) != OG_SUCCESS) {
vm_close(ogx->session, ogx->pool, temp_page->vmid, VM_ENQUE_TAIL);
return OG_ERROR;
}
if (res >= 0) {
*pos = rowid_list->row_cnt;
vm_close(ogx->session, ogx->pool, temp_page->vmid, VM_ENQUE_TAIL);
return OG_SUCCESS;
}
while (begin < end) {
curr = (end + begin) >> 1;
cmp_rid = rowid_list->rid[curr];
if (temp_page->vmid != cmp_rid.vmid) {
vm_close(ogx->session, ogx->pool, temp_page->vmid, VM_ENQUE_TAIL);
OG_RETURN_IFERR(vm_open(ogx->session, ogx->pool, cmp_rid.vmid, &temp_page));
page = (mtrl_page_t *)temp_page->data;
}
cmp_row = MTRL_GET_ROW(page, cmp_rid.slot);
if (ogx->sort_cmp(segment, row, cmp_row, &res) != OG_SUCCESS) {
vm_close(ogx->session, ogx->pool, temp_page->vmid, VM_ENQUE_TAIL);
return OG_ERROR;
}
OG_BREAK_IF_TRUE(res == 0);
if (res < 0) {
end = curr;
} else {
begin = curr + 1;
}
}
*pos = res >= 0 ? curr + 1 : curr;
vm_close(ogx->session, ogx->pool, temp_page->vmid, VM_ENQUE_TAIL);
return OG_SUCCESS;
}
static status_t sql_save_result_into_page(mtrl_context_t *ogx, uint32 seg_id, mtrl_rowid_list_t *rowid_list)
{
mtrl_segment_t *seg = ogx->segments[seg_id];
id_list_t ori_list = seg->vm_list;
mtrl_page_t *page = NULL;
mtrl_page_t *ori_page = NULL;
vm_page_t *tmp_page = NULL;
char *sort_row = NULL;
mtrl_rowid_t rowid;
status_t status = OG_ERROR;
if (rowid_list->row_cnt == 0) {
return OG_SUCCESS;
}
mtrl_close_segment(ogx, seg_id);
do {
seg->vm_list.count = 0;
OG_BREAK_IF_ERROR(mtrl_extend_segment(ogx, seg));
OG_BREAK_IF_ERROR(mtrl_open_page(ogx, seg->vm_list.first, &seg->curr_page));
page = (mtrl_page_t *)seg->curr_page->data;
mtrl_init_page(page, seg->vm_list.first);
OG_BREAK_IF_ERROR(vm_open(ogx->session, ogx->pool, rowid_list->rid[0].vmid, &tmp_page));
ori_page = (mtrl_page_t *)tmp_page->data;
handle_t cmp_items = seg->cmp_items;
seg->cmp_items = NULL;
for (uint32 i = 0; i < rowid_list->row_cnt; i++) {
rowid = rowid_list->rid[i];
if (rowid.vmid != tmp_page->vmid) {
if (vm_open(ogx->session, ogx->pool, rowid.vmid, &tmp_page) != OG_SUCCESS) {
vm_free_list(ogx->session, ogx->pool, &ori_list);
return OG_ERROR;
}
ori_page = (mtrl_page_t *)tmp_page->data;
}
sort_row = MTRL_GET_ROW(ori_page, rowid.slot);
if (mtrl_insert_row(ogx, seg_id, sort_row, &rowid) != OG_SUCCESS) {
vm_free_list(ogx->session, ogx->pool, &ori_list);
return OG_ERROR;
}
rowid_list->rid[i] = rowid;
}
seg->cmp_items = cmp_items;
status = OG_SUCCESS;
} while (0);
OG_LOG_DEBUG_INF("shrink vm page: original num %u, current num %u", ori_list.count, seg->vm_list.count);
vm_free_list(ogx->session, ogx->pool, &ori_list);
return status;
}
static status_t sql_mtrl_insert_sort_row(mtrl_context_t *ogx, uint32 seg_id, char *row, mtrl_rowid_t rs_rid,
mtrl_rowid_list_t *rowid_list)
{
mtrl_rowid_t rid;
uint32 pos;
mtrl_segment_t *seg = ogx->segments[seg_id];
uint32 copy_size;
uint32 max_size;
uint32 row_size;
double utilization;
OG_RETURN_IFERR(sql_make_query_sort_row(row, rs_rid));
handle_t cmp_items = seg->cmp_items;
seg->cmp_items = NULL;
OG_RETURN_IFERR(mtrl_insert_row(ogx, seg_id, row, &rid));
seg->cmp_items = cmp_items;
OG_RETURN_IFERR(mtrl_get_sort_pos(ogx, seg, rowid_list, row, &pos));
if (pos == rowid_list->row_cnt && rowid_list->row_cnt < rowid_list->max_row) {
rowid_list->rid[rowid_list->row_cnt++] = rid;
return OG_SUCCESS;
}
copy_size = (rowid_list->row_cnt - pos) * sizeof(mtrl_rowid_t);
max_size = (rowid_list->max_row - pos - 1) * sizeof(mtrl_rowid_t);
if (rowid_list->row_cnt < rowid_list->max_row) {
rowid_list->row_cnt++;
} else {
copy_size = max_size;
}
if (max_size != 0) {
MEMS_RETURN_IFERR(memmove_s(&rowid_list->rid[pos + 1], max_size, &rowid_list->rid[pos], copy_size));
}
rowid_list->rid[pos] = rid;
row_size = (uint32)((row_head_t *)row)->size * rowid_list->max_row;
utilization = (double)(row_size / OG_VMEM_PAGE_SIZE + 1) / seg->vm_list.count;
if (utilization < 0.1) {
OG_RETURN_IFERR(sql_save_result_into_page(ogx, seg_id, rowid_list));
}
return OG_SUCCESS;
}
static inline status_t sql_insert_sort_row(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, char *buf,
char *sort_buf, mtrl_rowid_list_t *rowid_list)
{
mtrl_rowid_t rid;
OG_RETURN_IFERR(sql_make_mtrl_rs_row(stmt, cursor->mtrl.rs.buf, plan->query_sort.select_columns, buf));
OG_RETURN_IFERR(mtrl_insert_row(&stmt->mtrl, cursor->mtrl.rs.sid, buf, &rid));
OG_RETURN_IFERR(sql_mtrl_insert_sort_row(&stmt->mtrl, cursor->mtrl.sort.sid, sort_buf, rid, rowid_list));
return OG_SUCCESS;
}
static inline status_t judge_row_is_candidate(sql_stmt_t *stmt, sql_cursor_t *cursor, galist_t *sort_items,
char *sort_buf, mtrl_rowid_list_t *rowid_list, bool32 *candidate)
{
row_assist_t ra;
row_init(&ra, sort_buf, OG_MAX_ROW_SIZE, sort_items->count);
OG_RETURN_IFERR(sql_make_mtrl_sort_row(stmt, cursor->mtrl.sort.buf, sort_items, &ra));
OG_RETURN_IFERR(sql_cmp_with_top_row(&stmt->mtrl, cursor->mtrl.sort.sid, rowid_list, sort_buf, candidate));
return OG_SUCCESS;
}
static status_t sql_mtrl_query_sort_by_rownum(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
bool32 eof = OG_FALSE;
char *buf = NULL;
char *s_buf = NULL;
status_t status = OG_ERROR;
mtrl_rowid_list_t *rowid_list = NULL;
bool32 candidate = OG_TRUE;
if (plan->query_sort.rownum_upper == 0) {
cursor->eof = OG_TRUE;
return OG_SUCCESS;
}
uint32 size = sizeof(mtrl_rowid_t) * plan->query_sort.rownum_upper + sizeof(mtrl_rowid_list_t);
OGSQL_SAVE_STACK(stmt);
OG_RETURN_IFERR(sql_push(stmt, size, (void **)&rowid_list));
OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buf));
OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&s_buf));
rowid_list->row_cnt = 0;
rowid_list->max_row = plan->query_sort.rownum_upper;
for (;;) {
OGSQL_SAVE_STACK(stmt);
if (sql_fetch_query(stmt, cursor, plan->query_sort.next, &eof) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
break;
}
if (eof) {
OGSQL_RESTORE_STACK(stmt);
status = OG_SUCCESS;
break;
}
if (judge_row_is_candidate(stmt, cursor, plan->query_sort.items, s_buf, rowid_list, &candidate) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
break;
}
if (!candidate) {
OGSQL_RESTORE_STACK(stmt);
continue;
}
if (sql_insert_sort_row(stmt, cursor, plan, buf, s_buf, rowid_list) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
break;
}
OGSQL_RESTORE_STACK(stmt);
}
if (status == OG_SUCCESS) {
status = sql_save_result_into_page(&stmt->mtrl, cursor->mtrl.sort.sid, rowid_list);
}
OGSQL_RESTORE_STACK(stmt);
OG_RETURN_IFERR(sql_free_query_mtrl(stmt, cursor, plan->query_sort.next));
return status;
}
status_t sql_mtrl_query_sort(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
bool32 eof = OG_FALSE;
char *buf = NULL;
mtrl_rowid_t rid;
status_t status = OG_SUCCESS;
if (plan->query_sort.rownum_upper <= g_instance->sql.topn_threshold && g_instance->sql.topn_threshold != 0) {
mtrl_reset_sort_type(&stmt->mtrl, cursor->mtrl.sort.sid);
return sql_mtrl_query_sort_by_rownum(stmt, cursor, plan);
}
OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buf));
for (;;) {
OGSQL_SAVE_STACK(stmt);
if (sql_fetch_query(stmt, cursor, plan->query_sort.next, &eof) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
status = OG_ERROR;
break;
}
if (eof) {
OGSQL_RESTORE_STACK(stmt);
break;
}
if (sql_make_mtrl_rs_row(stmt, cursor->mtrl.rs.buf, plan->query_sort.select_columns, buf) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
status = OG_ERROR;
break;
}
if (mtrl_insert_row(&stmt->mtrl, cursor->mtrl.rs.sid, buf, &rid) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
status = OG_ERROR;
break;
}
if (sql_make_mtrl_query_sort_row(stmt, cursor->mtrl.sort.buf, plan->query_sort.items, &rid, buf) !=
OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
status = OG_ERROR;
break;
}
if (mtrl_insert_row(&stmt->mtrl, cursor->mtrl.sort.sid, buf, &rid) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
status = OG_ERROR;
break;
}
OGSQL_RESTORE_STACK(stmt);
}
OGSQL_POP(stmt);
OG_RETURN_IFERR(sql_free_query_mtrl(stmt, cursor, plan->query_sort.next));
return status;
}
static status_t sql_mtrl_select_sort(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_cursor_t *sub_cursor,
plan_node_t *plan)
{
bool32 eof = OG_FALSE;
char *buf = NULL;
mtrl_rowid_t rid;
status_t status = OG_SUCCESS;
if (cursor->exec_data.select_limit != NULL) {
mtrl_reset_sort_type(&stmt->mtrl, cursor->mtrl.sort.sid);
}
OG_RETURN_IFERR(SQL_CURSOR_PUSH(stmt, sub_cursor));
OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buf));
for (;;) {
OGSQL_SAVE_STACK(stmt);
if (sql_fetch_cursor(stmt, sub_cursor, plan->select_sort.next, &eof) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
status = OG_ERROR;
break;
}
if (eof) {
OGSQL_RESTORE_STACK(stmt);
break;
}
if (sql_make_mtrl_rs_row(stmt, sub_cursor->mtrl.rs.buf, sub_cursor->columns, buf) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
status = OG_ERROR;
break;
}
if (mtrl_insert_row(&stmt->mtrl, cursor->mtrl.rs.sid, buf, &rid) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
status = OG_ERROR;
break;
}
if (sql_make_mtrl_select_sort_row(stmt, sub_cursor->mtrl.rs.buf, sub_cursor, plan->select_sort.items, &rid,
buf) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
status = OG_ERROR;
break;
}
if (mtrl_insert_row(&stmt->mtrl, cursor->mtrl.sort.sid, buf, &rid) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
status = OG_ERROR;
break;
}
OGSQL_RESTORE_STACK(stmt);
}
OGSQL_POP(stmt);
SQL_CURSOR_POP(stmt);
OG_RETURN_IFERR(sql_free_query_mtrl(stmt, cursor, plan->select_sort.next));
return status;
}
status_t sql_sort_mtrl_record_types(vmc_t *vmc, mtrl_segment_type_t sort_type, galist_t *cmp_items, char **buf)
{
uint32 i;
og_type_t *types = NULL;
og_type_t data_type;
sort_item_t *item = NULL;
select_sort_item_t *sort_item = NULL;
rs_column_t *column = NULL;
expr_tree_t *expr = NULL;
if (*buf == NULL) {
uint32 mem_cost_size = cmp_items->count * sizeof(og_type_t);
mem_cost_size += sizeof(uint32);
OG_RETURN_IFERR(vmc_alloc(vmc, mem_cost_size, (void **)buf));
*(uint32 *)(*buf) = mem_cost_size;
}
types = (og_type_t *)(*buf + sizeof(uint32));
for (i = 0; i < cmp_items->count; i++) {
switch (sort_type) {
case MTRL_SEGMENT_QUERY_SORT:
case MTRL_SEGMENT_CONCAT_SORT:
case MTRL_SEGMENT_SIBL_SORT:
item = (sort_item_t *)cm_galist_get(cmp_items, i);
expr = item->expr;
data_type = expr->root->datatype;
break;
case MTRL_SEGMENT_SELECT_SORT:
sort_item = (select_sort_item_t *)cm_galist_get(cmp_items, i);
data_type = sort_item->datatype;
break;
case MTRL_SEGMENT_DISTINCT:
case MTRL_SEGMENT_RS:
column = (rs_column_t *)cm_galist_get(cmp_items, i);
data_type = column->datatype;
break;
default:
expr = (expr_tree_t *)cm_galist_get(cmp_items, i);
data_type = expr->root->datatype;
break;
}
types[i] = data_type;
}
return OG_SUCCESS;
}
status_t sql_sort_mtrl_open_segment(sql_stmt_t *stmt, sql_cursor_t *cursor, mtrl_segment_type_t sort_type,
galist_t *cmp_items)
{
if (mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_RS, NULL, &cursor->mtrl.rs.sid) != OG_SUCCESS) {
return OG_ERROR;
}
if (mtrl_create_segment(&stmt->mtrl, sort_type, (handle_t)cmp_items, &cursor->mtrl.sort.sid) != OG_SUCCESS) {
return OG_ERROR;
}
if (mtrl_open_segment(&stmt->mtrl, cursor->mtrl.rs.sid) != OG_SUCCESS) {
return OG_ERROR;
}
if (mtrl_open_segment(&stmt->mtrl, cursor->mtrl.sort.sid) != OG_SUCCESS) {
mtrl_close_segment(&stmt->mtrl, cursor->mtrl.rs.sid);
return OG_ERROR;
}
OG_RETURN_IFERR(sql_sort_mtrl_record_types(&cursor->vmc, sort_type, cmp_items, &cursor->mtrl.sort.buf));
stmt->mtrl.segments[cursor->mtrl.sort.sid]->pending_type_buf = cursor->mtrl.sort.buf;
return OG_SUCCESS;
}
void sql_sort_mtrl_close_segment(sql_stmt_t *stmt, sql_cursor_t *cursor)
{
if (cursor->mtrl.rs.sid != OG_INVALID_ID32) {
mtrl_close_segment(&stmt->mtrl, cursor->mtrl.rs.sid);
}
if (cursor->mtrl.sort.sid != OG_INVALID_ID32) {
mtrl_close_segment(&stmt->mtrl, cursor->mtrl.sort.sid);
}
}
void sql_sort_mtrl_release_segment(sql_stmt_t *stmt, sql_cursor_t *cursor)
{
OGSQL_RELEASE_SEGMENT(stmt, cursor->mtrl.rs.sid);
OGSQL_RELEASE_SEGMENT(stmt, cursor->mtrl.sort.sid);
}
status_t sql_execute_select_sort(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
sql_cursor_t *sub_cur = NULL;
uint32 mem_size;
#ifdef TIME_STATISTIC
clock_t start;
double timeuse;
start = cm_cal_time_bengin();
#endif
sql_open_select_cursor(stmt, cursor, plan->select_sort.rs_columns);
if (sql_alloc_cursor(stmt, &sub_cur) != OG_SUCCESS) {
return OG_ERROR;
}
sub_cur->scn = cursor->scn;
sub_cur->ancestor_ref = cursor->ancestor_ref;
if (sql_execute_select_plan(stmt, sub_cur, plan->select_sort.next) != OG_SUCCESS) {
sql_free_cursor(stmt, sub_cur);
return OG_ERROR;
}
if (sql_sort_mtrl_open_segment(stmt, cursor, MTRL_SEGMENT_SELECT_SORT, plan->select_sort.items) != OG_SUCCESS) {
sql_free_cursor(stmt, sub_cur);
return OG_ERROR;
}
if (sql_mtrl_select_sort(stmt, cursor, sub_cur, plan) != OG_SUCCESS) {
sql_sort_mtrl_close_segment(stmt, cursor);
sql_free_cursor(stmt, sub_cur);
return OG_ERROR;
}
sql_sort_mtrl_close_segment(stmt, cursor);
if (cursor->mtrl.rs.buf == NULL && sub_cur->mtrl.rs.buf != NULL) {
mem_size = *(uint32 *)sub_cur->mtrl.rs.buf;
OG_RETURN_IFERR(vmc_alloc(&cursor->vmc, mem_size, (void **)&cursor->mtrl.rs.buf));
MEMS_RETURN_IFERR(memcpy_s(cursor->mtrl.rs.buf, mem_size, sub_cur->mtrl.rs.buf, mem_size));
}
sql_free_cursor(stmt, sub_cur);
if (mtrl_sort_segment(&stmt->mtrl, cursor->mtrl.sort.sid) != OG_SUCCESS) {
return OG_ERROR;
}
if (mtrl_open_cursor(&stmt->mtrl, cursor->mtrl.sort.sid, &cursor->mtrl.cursor) != OG_SUCCESS) {
return OG_ERROR;
}
#ifdef TIME_STATISTIC
timeuse = cm_cal_time_end(start);
stmt->mt_time += timeuse;
#endif
return OG_SUCCESS;
}
status_t sql_execute_query_sort(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
status_t status = OG_ERROR;
#ifdef TIME_STATISTIC
clock_t start;
double timeuse;
start = cm_cal_time_bengin();
#endif
do {
OG_BREAK_IF_ERROR(sql_execute_query_plan(stmt, cursor, plan->query_sort.next));
if (cursor->eof) {
status = OG_SUCCESS;
break;
}
OG_BREAK_IF_ERROR(sql_sort_mtrl_open_segment(stmt, cursor, MTRL_SEGMENT_QUERY_SORT, plan->query_sort.items));
if (sql_mtrl_query_sort(stmt, cursor, plan) != OG_SUCCESS) {
sql_sort_mtrl_close_segment(stmt, cursor);
break;
}
sql_sort_mtrl_close_segment(stmt, cursor);
if (cursor->eof) {
status = OG_SUCCESS;
break;
}
OG_BREAK_IF_ERROR(mtrl_sort_segment(&stmt->mtrl, cursor->mtrl.sort.sid));
OG_BREAK_IF_ERROR(mtrl_open_cursor(&stmt->mtrl, cursor->mtrl.sort.sid, &cursor->mtrl.cursor));
status = OG_SUCCESS;
} while (0);
#ifdef TIME_STATISTIC
timeuse = cm_cal_time_end(start);
stmt->mt_time += timeuse;
#endif
return status;
}
status_t sql_put_sort_row(sql_stmt_t *stmt, sql_cursor_t *cursor, bool32 *is_full)
{
char *row = cursor->mtrl.cursor.row.data;
OG_RETURN_IFERR(my_sender(stmt)->send_row_data(stmt, row, is_full));
sql_inc_rows(stmt, cursor);
return OG_SUCCESS;
}
static status_t sql_alloc_mtrl_cursor(sql_stmt_t *stmt, sql_cursor_t *cursor, mtrl_cursor_t **mtrl_cursor, mtrl_rowid_t *rid)
{
if (cursor->mtrl.sibl_sort.cursor_sid == OG_INVALID_ID32) {
OG_RETURN_IFERR(
mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_SIBL_SORT, NULL, &cursor->mtrl.sibl_sort.cursor_sid));
OG_RETURN_IFERR(mtrl_open_segment(&stmt->mtrl, cursor->mtrl.sibl_sort.cursor_sid));
}
OG_RETURN_IFERR(sql_alloc_mem_from_seg(stmt, stmt->mtrl.segments[cursor->mtrl.sibl_sort.cursor_sid],
sizeof(mtrl_cursor_t), (void **)mtrl_cursor, rid));
MEMS_RETURN_IFERR(memset_s((*mtrl_cursor), sizeof(mtrl_cursor_t), 0, sizeof(mtrl_cursor_t)));
return OG_SUCCESS;
}
* 1.if next_level_cursor is not eof, fetch from next_level_cursor and return.
2.if curr_level_cursor is eof, fetch from pre_level_cursor and return.
3.fetch from curr_level_cursor, if fetched row is a non-leaf node, sort its sub-segment and open next_level_cursor.
*/
static status_t sql_fetch_sibl_sort_one_record(sql_stmt_t *stmt, sql_cursor_t *cursor)
{
sibl_sort_row_t sibl_row;
mtrl_segment_t *seg = NULL;
mtrl_cursor_t *curr_cur = NULL;
mtrl_cursor_t *next_cur = NULL;
mtrl_cursor_t *pre_cur = NULL;
mtrl_rowid_t next_cursor_rid;
mtrl_rowid_t pre_cursor_rid;
vm_page_t *page = NULL;
OG_RETURN_IFERR(sql_stack_safe(stmt));
OG_RETURN_IFERR(sql_get_mtrl_cursor(stmt, cursor, &cursor->mtrl.cursor.curr_cursor_rid, &curr_cur));
next_cursor_rid = curr_cur->next_cursor_rid;
pre_cursor_rid = curr_cur->pre_cursor_rid;
OG_RETURN_IFERR(sql_get_mtrl_cursor(stmt, cursor, &next_cursor_rid, &next_cur));
if (next_cur != NULL && !next_cur->eof) {
cursor->mtrl.cursor.curr_cursor_rid = next_cursor_rid;
return sql_fetch_sibl_sort_one_record(stmt, cursor);
}
OG_RETURN_IFERR(sql_get_mtrl_cursor(stmt, cursor, &cursor->mtrl.cursor.curr_cursor_rid, &curr_cur));
OG_RETURN_IFERR(mtrl_fetch_sibl_sort(&stmt->mtrl, &cursor->mtrl.cursor, curr_cur, &sibl_row));
if (curr_cur->eof) {
OG_RETURN_IFERR(sql_get_mtrl_cursor(stmt, cursor, &pre_cursor_rid, &pre_cur));
if (pre_cur == NULL) {
cursor->mtrl.cursor.eof = OG_TRUE;
return OG_SUCCESS;
}
if (mtrl_open_page(&stmt->mtrl, pre_cur->sort.vmid, &page) != OG_SUCCESS) {
return OG_ERROR;
}
pre_cur->sort.page = (mtrl_page_t *)page->data;
cursor->mtrl.cursor.curr_cursor_rid = pre_cursor_rid;
return sql_fetch_sibl_sort_one_record(stmt, cursor);
}
if (sibl_row.child_seg_rid.vmid != OG_INVALID_ID32) {
if (next_cursor_rid.vmid == OG_INVALID_ID32) {
OG_RETURN_IFERR(sql_alloc_mtrl_cursor(stmt, cursor, &next_cur, &next_cursor_rid));
} else {
OG_RETURN_IFERR(sql_get_mtrl_cursor(stmt, cursor, &next_cursor_rid, &next_cur));
}
OG_RETURN_IFERR(sql_get_segment_in_vm(stmt, cursor->mtrl.sibl_sort.sid, &sibl_row.child_seg_rid, &seg));
OG_RETURN_IFERR(mtrl_sort_segment2(&stmt->mtrl, seg));
if (mtrl_open_cursor2(&stmt->mtrl, seg, next_cur) != OG_SUCCESS) {
return OG_ERROR;
}
next_cur->pre_cursor_rid = cursor->mtrl.cursor.curr_cursor_rid;
OG_RETURN_IFERR(sql_get_mtrl_cursor(stmt, cursor, &cursor->mtrl.cursor.curr_cursor_rid, &curr_cur));
curr_cur->next_cursor_rid = next_cursor_rid;
mtrl_close_page(&stmt->mtrl, curr_cur->sort.vmid);
}
if (!sibl_row.match_filter) {
return sql_fetch_sibl_sort_one_record(stmt, cursor);
}
return OG_SUCCESS;
}
status_t sql_fetch_sibl_sort(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, bool32 *eof)
{
if (sql_fetch_sibl_sort_one_record(stmt, cursor) != OG_SUCCESS) {
return OG_ERROR;
}
*eof = cursor->mtrl.cursor.eof;
return OG_SUCCESS;
}
static status_t sql_alloc_mtrl_sibl_sort_seg(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan,
mtrl_rowid_t *sub_seg_rid)
{
mtrl_segment_t *sub_seg = NULL;
if (cursor->mtrl.sibl_sort.sid == OG_INVALID_ID32) {
OG_RETURN_IFERR(mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_SIBL_SORT, NULL, &cursor->mtrl.sibl_sort.sid));
OG_RETURN_IFERR(mtrl_open_segment(&stmt->mtrl, cursor->mtrl.sibl_sort.sid));
}
OG_RETURN_IFERR(sql_alloc_segment_in_vm(stmt, cursor->mtrl.sibl_sort.sid, &sub_seg, sub_seg_rid));
mtrl_init_segment(sub_seg, MTRL_SEGMENT_SIBL_SORT, plan->query_sort.items);
OG_RETURN_IFERR(mtrl_extend_segment(&stmt->mtrl, sub_seg));
OG_RETURN_IFERR(mtrl_open_page(&stmt->mtrl, sub_seg->vm_list.last, &sub_seg->curr_page));
mtrl_init_page((mtrl_page_t *)sub_seg->curr_page->data, sub_seg->vm_list.last);
mtrl_close_segment2(&stmt->mtrl, sub_seg);
return OG_SUCCESS;
}
static status_t sql_get_last_sibl_sort_row(galist_t *last_sibl_sort_rows, uint32 level, sibl_sort_row_t **sibl_sort_row)
{
*sibl_sort_row = NULL;
if (last_sibl_sort_rows->count >= level) {
*sibl_sort_row = cm_galist_get(last_sibl_sort_rows, level - 1);
}
return OG_SUCCESS;
}
static status_t sql_set_last_sibl_sort_row(galist_t *last_sibl_sort_rows, uint32 level, sibl_sort_row_t *sibl_sort_row)
{
sibl_sort_row_t *row = NULL;
if (last_sibl_sort_rows->count < level) {
OG_RETURN_IFERR(cm_galist_new(last_sibl_sort_rows, sizeof(sibl_sort_row_t), (pointer_t *)&row));
} else {
row = cm_galist_get(last_sibl_sort_rows, level - 1);
}
*row = *sibl_sort_row;
return OG_SUCCESS;
}
* 1. for root node data(level == 1),cache to the default sort segment(cursor->mtrl.sort.sid);
2. for child node data, cache to sub-segment created by the parent node.
*/
static status_t sql_insert_mtrl_sibl_sort_row(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, char *buf,
sibl_sort_row_t *sibl_sort_row, galist_t *last_sibl_sort_rows, bool8 last_is_leaf)
{
mtrl_rowid_t sort_rid;
mtrl_segment_t *seg = NULL;
sibl_sort_row_t *last_sibl_sort_row = NULL;
sql_cursor_t *cur_level_cursor = cursor->connect_data.first_level_cursor->connect_data.cur_level_cursor;
if (cur_level_cursor->connect_data.level == 1) {
OG_RETURN_IFERR(
sql_make_mtrl_sibl_sort_row(stmt, cursor->mtrl.sort.buf, plan->query_sort.items, buf, sibl_sort_row));
OG_RETURN_IFERR(mtrl_insert_row(&stmt->mtrl, cursor->mtrl.sort.sid, buf, &sort_rid));
} else {
if (!last_is_leaf) {
OG_RETURN_IFERR(sql_get_last_sibl_sort_row(last_sibl_sort_rows, cur_level_cursor->connect_data.level - 1,
&last_sibl_sort_row));
sibl_sort_row->sibling_seg_rid = last_sibl_sort_row->child_seg_rid;
} else {
OG_RETURN_IFERR(sql_get_last_sibl_sort_row(last_sibl_sort_rows, cur_level_cursor->connect_data.level,
&last_sibl_sort_row));
sibl_sort_row->sibling_seg_rid = last_sibl_sort_row->sibling_seg_rid;
}
OG_RETURN_IFERR(
sql_get_segment_in_vm(stmt, cursor->mtrl.sibl_sort.sid, &sibl_sort_row->sibling_seg_rid, &seg));
OG_RETURN_IFERR(
sql_make_mtrl_sibl_sort_row(stmt, cursor->mtrl.sort.buf, plan->query_sort.items, buf, sibl_sort_row));
OG_RETURN_IFERR(mtrl_open_page(&stmt->mtrl, seg->vm_list.last, &seg->curr_page));
if (mtrl_insert_row2(&stmt->mtrl, seg, buf, &sort_rid) != OG_SUCCESS) {
mtrl_close_segment2(&stmt->mtrl, seg);
return OG_ERROR;
}
mtrl_close_segment2(&stmt->mtrl, seg);
}
return OG_SUCCESS;
}
static void sql_init_sibl_sort_row(sibl_sort_row_t *sibl_sort_row)
{
sibl_sort_row->rs_rid.slot = OG_INVALID_ID32;
sibl_sort_row->rs_rid.vmid = OG_INVALID_ID32;
sibl_sort_row->child_seg_rid.slot = OG_INVALID_ID32;
sibl_sort_row->child_seg_rid.vmid = OG_INVALID_ID32;
sibl_sort_row->sibling_seg_rid.slot = OG_INVALID_ID32;
sibl_sort_row->sibling_seg_rid.vmid = OG_INVALID_ID32;
sibl_sort_row->match_filter = OG_TRUE;
}
static status_t sql_mtrl_query_sibl_sort(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
bool32 eof = OG_FALSE;
char *buf = NULL;
status_t status = OG_ERROR;
sql_cursor_t *cur_level_cursor = NULL;
sibl_sort_row_t sibl_sort_row;
galist_t *last_sibl_sort_rows = NULL;
bool8 last_is_leaf = OG_FALSE;
vmc_t vmc;
vmc_init(&stmt->session->vmp, &vmc);
OG_RETURN_IFERR(vmc_alloc(&vmc, sizeof(galist_t), (void **)&last_sibl_sort_rows));
cm_galist_init(last_sibl_sort_rows, &vmc, vmc_alloc);
if (sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buf) != OG_SUCCESS) {
vmc_free(&vmc);
return OG_ERROR;
}
OGSQL_SAVE_STACK(stmt);
do {
sql_init_sibl_sort_row(&sibl_sort_row);
OG_BREAK_IF_ERROR(sql_fetch_query(stmt, cursor, plan->query_sort.next, &eof));
if (eof) {
status = OG_SUCCESS;
break;
}
OG_BREAK_IF_ERROR(sql_make_mtrl_rs_row(stmt, cursor->mtrl.rs.buf, plan->query_sort.select_columns, buf));
OG_BREAK_IF_ERROR(mtrl_insert_row(&stmt->mtrl, cursor->mtrl.rs.sid, buf, &sibl_sort_row.rs_rid));
if (cursor->query != NULL && cursor->query->filter_cond != NULL) {
OG_BREAK_IF_ERROR(sql_match_cond_node(stmt, cursor->query->filter_cond->root, &sibl_sort_row.match_filter));
}
cur_level_cursor = cursor->connect_data.first_level_cursor->connect_data.cur_level_cursor;
if (!cur_level_cursor->connect_data.connect_by_isleaf) {
OG_BREAK_IF_ERROR(sql_alloc_mtrl_sibl_sort_seg(stmt, cursor, plan, &sibl_sort_row.child_seg_rid));
}
OG_BREAK_IF_ERROR(
sql_insert_mtrl_sibl_sort_row(stmt, cursor, plan, buf, &sibl_sort_row, last_sibl_sort_rows, last_is_leaf));
OG_BREAK_IF_ERROR(
sql_set_last_sibl_sort_row(last_sibl_sort_rows, cur_level_cursor->connect_data.level, &sibl_sort_row));
last_is_leaf = cur_level_cursor->connect_data.connect_by_isleaf;
cursor->rownum++;
OGSQL_RESTORE_STACK(stmt);
} while (OG_TRUE);
cursor->rownum = 1;
OGSQL_RESTORE_STACK(stmt);
OGSQL_POP(stmt);
vmc_free(&vmc);
return status;
}
static void sql_sibl_sort_mtrl_close_segment(sql_stmt_t *stmt, sql_cursor_t *cursor)
{
sql_sort_mtrl_close_segment(stmt, cursor);
if (cursor->mtrl.sibl_sort.sid != OG_INVALID_ID32) {
mtrl_close_segment(&stmt->mtrl, cursor->mtrl.sibl_sort.sid);
}
}
status_t sql_execute_query_sibl_sort(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
status_t status = OG_ERROR;
mtrl_cursor_t *curr_cursor = NULL;
#ifdef TIME_STATISTIC
clock_t start;
double timeuse;
start = cm_cal_time_bengin();
#endif
do {
OG_BREAK_IF_ERROR(sql_execute_query_plan(stmt, cursor, plan->query_sort.next));
if (cursor->eof) {
status = OG_SUCCESS;
break;
}
OG_BREAK_IF_ERROR(sql_sort_mtrl_open_segment(stmt, cursor, MTRL_SEGMENT_SIBL_SORT, plan->query_sort.items));
if (sql_mtrl_query_sibl_sort(stmt, cursor, plan) != OG_SUCCESS) {
sql_sibl_sort_mtrl_close_segment(stmt, cursor);
break;
}
sql_sort_mtrl_close_segment(stmt, cursor);
if (cursor->eof) {
status = OG_SUCCESS;
break;
}
OG_BREAK_IF_ERROR(mtrl_sort_segment(&stmt->mtrl, cursor->mtrl.sort.sid));
OG_BREAK_IF_ERROR(sql_alloc_mtrl_cursor(stmt, cursor, &curr_cursor, &cursor->mtrl.cursor.curr_cursor_rid));
OG_BREAK_IF_ERROR(mtrl_open_cursor(&stmt->mtrl, cursor->mtrl.sort.sid, curr_cursor));
curr_cursor->curr_cursor_rid = cursor->mtrl.cursor.curr_cursor_rid;
status = OG_SUCCESS;
} while (0);
#ifdef TIME_STATISTIC
timeuse = cm_cal_time_end(start);
stmt->mt_time += timeuse;
#endif
return status;
}
status_t sql_fetch_sort_for_minus(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, bool32 *eof)
{
bool32 group_changed = OG_FALSE;
if (cursor->mtrl.cursor.eof) {
*eof = OG_TRUE;
return OG_SUCCESS;
}
if (!(cursor->mtrl.cursor.eof)) {
if (mtrl_fetch_group(&stmt->mtrl, &cursor->mtrl.cursor, &group_changed) != OG_SUCCESS) {
return OG_ERROR;
}
}
*eof = cursor->mtrl.cursor.eof;
return OG_SUCCESS;
}