* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* ogsql_connect_mtrl.c
*
*
* IDENTIFICATION
* src/ogsql/executor/ogsql_connect_mtrl.c
*
* -------------------------------------------------------------------------
*/
#include "ogsql_connect_mtrl.h"
#include "ogsql_select.h"
#include "ogsql_mtrl.h"
#include "ogsql_join_comm.h"
#include "srv_instance.h"
#define CB_MTRL_CONTEXT(cursor) ((cursor)->connect_data.first_level_cursor->cb_mtrl_ctx)
#define CB_MTRL_PLAN (CB_MTRL_CONTEXT(cursor)->cb_mtrl_p)
#define CB_MTRL_SEGMENT (&CB_MTRL_CONTEXT(cursor)->hash_segment)
#define CB_MTRL_TABLE_ENTRY (&CB_MTRL_CONTEXT(cursor)->hash_table)
#define CB_MTRL_LAST_CURSOR (CB_MTRL_CONTEXT(cursor)->last_cursor)
#define CB_MTRL_CURR_CURSOR (CB_MTRL_CONTEXT(cursor)->curr_cursor)
#define CB_MTRL_NEXT_CURSOR (CB_MTRL_CONTEXT(cursor)->next_cursor)
#define CB_MTRL_TEMP_ITER (&CB_MTRL_CONTEXT(cursor)->iter)
#define CB_MTRL_SECOND_LEVEL 2
static status_t sql_connect_mtrl_get_first_entry(sql_stmt_t *stmt, sql_cursor_t *cursor, hash_table_iter_t *iter);
static inline uint32 sql_connect_mtrl_get_level(sql_cursor_t *cursor)
{
return CB_MTRL_CONTEXT(cursor)->curr_level;
}
static inline cb_mtrl_data_t *sql_connect_mtrl_get_cb_data(sql_cursor_t *cursor, uint32 level)
{
CM_ASSERT(level <= CB_MTRL_CONTEXT(cursor)->cb_data->count);
return (cb_mtrl_data_t *)(cm_galist_get(CB_MTRL_CONTEXT(cursor)->cb_data, level - 1));
}
static inline hash_table_iter_t *sql_connect_mtrl_get_iter(sql_cursor_t *cursor, uint32 level)
{
return &(sql_connect_mtrl_get_cb_data(cursor, level)->iter);
}
static inline mtrl_rowid_t *sql_connect_mtrl_get_prior_row(sql_cursor_t *cursor, uint32 level)
{
return &(sql_connect_mtrl_get_cb_data(cursor, level)->prior_row);
}
static inline status_t sql_connect_mtrl_alloc_cb_data(sql_cursor_t *cursor, uint32 level, cb_mtrl_data_t **cb_mtrl_data)
{
if (level > CB_MTRL_CONTEXT(cursor)->cb_data->count) {
return cm_galist_new(CB_MTRL_CONTEXT(cursor)->cb_data, sizeof(cb_mtrl_data_t), (pointer_t *)cb_mtrl_data);
}
*cb_mtrl_data = sql_connect_mtrl_get_cb_data(cursor, level);
return OG_SUCCESS;
}
static inline void sql_connect_mtrl_add_level(sql_cursor_t *cursor)
{
CB_MTRL_CONTEXT(cursor)->curr_level++;
}
static inline status_t sql_connect_mtrl_delete_level(sql_stmt_t *stmt, sql_cursor_t *cursor, uint32 level)
{
mtrl_rowid_t *rowid = sql_connect_mtrl_get_prior_row(cursor, level);
if (IS_VALID_MTRL_ROWID(*rowid)) {
OG_RETURN_IFERR(vmctx_free(GET_VM_CTX(stmt), rowid));
*rowid = g_invalid_entry;
}
CB_MTRL_CONTEXT(cursor)->curr_level--;
return OG_SUCCESS;
}
static inline status_t sql_connect_mtrl_push_cursor(sql_stmt_t *stmt, sql_cursor_t *dst_cursor)
{
if (dst_cursor->connect_data.last_level_cursor != NULL) {
OG_RETURN_IFERR(SQL_CURSOR_PUSH(stmt, dst_cursor->connect_data.last_level_cursor));
}
return SQL_CURSOR_PUSH(stmt, dst_cursor);
}
static inline void sql_connect_mtrl_pop_cursor(sql_stmt_t *ogsql_stmt, sql_cursor_t *dst_cursor)
{
SQL_CURSOR_POP(ogsql_stmt);
if (dst_cursor->connect_data.last_level_cursor != NULL) {
SQL_CURSOR_POP(ogsql_stmt);
}
}
static status_t sql_fetch_vm_row(sql_stmt_t *stmt, sql_cursor_t *cursor, mtrl_rowid_t *rids)
{
row_addr_t *rows = cursor->exec_data.join;
mtrl_context_t *mtrl_ctx = ((cursor->hash_join_ctx != NULL) && (cursor->hash_join_ctx->mtrl_ctx != NULL)) ?
(cursor->hash_join_ctx->mtrl_ctx) :
(&stmt->mtrl);
return sql_mtrl_fetch_tables_row(mtrl_ctx, &cursor->mtrl.cursor, rows, rids, cursor->table_count);
}
static status_t sql_get_one_row(void *callback_ctx, const char *new_buf, uint32 new_size, const char *old_buf, uint32 old_size,
bool32 found)
{
mtrl_rowid_t *vmids = NULL;
row_head_t *row_head = NULL;
sql_cursor_t *hash_cursor = (sql_cursor_t *)callback_ctx;
sql_stmt_t *stmt = hash_cursor->stmt;
row_head = (row_head_t *)old_buf;
vmids = (mtrl_rowid_t *)(old_buf + row_head->size);
return sql_fetch_vm_row(stmt, hash_cursor, vmids);
}
static inline status_t sql_connect_mtrl_init_table(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan,
cb_mtrl_ctx_t *ogx)
{
uint32 bucket_num = sql_get_plan_hash_rows(stmt, plan);
OG_RETURN_IFERR(vm_hash_table_alloc(&ogx->hash_table, &ogx->hash_segment, bucket_num));
return vm_hash_table_init(&ogx->hash_segment, &ogx->hash_table, NULL, sql_get_one_row, cursor);
}
static status_t sql_alloc_connect_mtrl_ctx(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
uint32 vmid;
vm_page_t *page = NULL;
cb_mtrl_ctx_t *mtrl_ctx = NULL;
OG_RETURN_IFERR(vm_alloc(KNL_SESSION(stmt), KNL_SESSION(stmt)->temp_pool, &vmid));
if (vm_open(KNL_SESSION(stmt), KNL_SESSION(stmt)->temp_pool, vmid, &page) != OG_SUCCESS) {
vm_free(KNL_SESSION(stmt), KNL_SESSION(stmt)->temp_pool, vmid);
return OG_ERROR;
}
mtrl_ctx = (cb_mtrl_ctx_t *)page->data;
mtrl_ctx->cb_mtrl_p = &plan->cb_mtrl;
mtrl_ctx->vmid = vmid;
mtrl_ctx->hash_table_rs = OG_INVALID_ID32;
mtrl_ctx->empty = OG_TRUE;
mtrl_ctx->curr_cursor = NULL;
mtrl_ctx->last_cursor = NULL;
mtrl_ctx->next_cursor = NULL;
mtrl_ctx->key_types = NULL;
mtrl_ctx->curr_level = 0;
if (vmc_alloc(&cursor->vmc, sizeof(galist_t), (void **)&mtrl_ctx->cb_data) != OG_SUCCESS) {
vm_free(KNL_SESSION(stmt), KNL_SESSION(stmt)->temp_pool, vmid);
return OG_ERROR;
}
cm_galist_init(mtrl_ctx->cb_data, &cursor->vmc, vmc_alloc);
vm_hash_segment_init(KNL_SESSION(stmt), KNL_SESSION(stmt)->temp_pool, &mtrl_ctx->hash_segment, PMA_POOL,
HASH_PAGES_HOLD,
HASH_AREA_SIZE);
if (sql_connect_mtrl_init_table(stmt, cursor, plan, mtrl_ctx) != OG_SUCCESS) {
vm_hash_segment_deinit(&mtrl_ctx->hash_segment);
vm_free(KNL_SESSION(stmt), KNL_SESSION(stmt)->temp_pool, vmid);
return OG_ERROR;
}
cursor->cb_mtrl_ctx = mtrl_ctx;
return OG_SUCCESS;
}
static status_t sql_make_connect_mtrl_rs_row(sql_stmt_t *stmt, sql_cursor_t *cursor, cb_mtrl_ctx_t *ogx,
mtrl_rowid_t *rids, uint32 rids_count)
{
char *buf = NULL;
sql_table_t *table = NULL;
sql_array_t *rs_tables = ogx->cb_mtrl_p->rs_tables;
if (SECUREC_UNLIKELY(rs_tables->count > rids_count)) {
OG_THROW_ERROR_EX(ERR_ASSERT_ERROR, "Join table count <= %u", rids_count);
return OG_ERROR;
}
OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buf));
for (uint32 i = 0; i < rs_tables->count; ++i) {
table = (sql_table_t *)sql_array_get(rs_tables, i);
if (table->subslct_tab_usage != SUBSELECT_4_NORMAL_JOIN) {
OG_RETURN_IFERR(sql_make_mtrl_null_rs_row(&stmt->mtrl, ogx->hash_table_rs, &rids[i]));
} else {
OG_RETURN_IFERR(sql_make_mtrl_table_rs_row(stmt, cursor, cursor->tables, table, buf, OG_MAX_ROW_SIZE));
OG_RETURN_IFERR(mtrl_insert_row(&stmt->mtrl, ogx->hash_table_rs, buf, &rids[i]));
}
}
OGSQL_POP(stmt);
return OG_SUCCESS;
}
static inline status_t sql_mtrl_row_append_data(char *row_buf, uint32 *size, const char *in_buf, uint32 buf_size)
{
row_head_t *row_head = (row_head_t *)row_buf;
if (buf_size + row_head->size > OG_MAX_ROW_SIZE) {
OG_THROW_ERROR(ERR_EXCEED_MAX_ROW_SIZE, buf_size + row_head->size, OG_MAX_ROW_SIZE);
return OG_ERROR;
}
MEMS_RETURN_IFERR(memcpy_sp(row_buf + row_head->size, OG_MAX_ROW_SIZE - row_head->size, in_buf, buf_size));
*size = buf_size + row_head->size;
return OG_SUCCESS;
}
static inline status_t sql_init_connect_mtrl_cursor(sql_stmt_t *stmt, sql_cursor_t *query_cur, sql_cursor_t *cursor)
{
status_t status;
query_cur->connect_data.last_level_cursor = cursor;
status = sql_open_cursors(stmt, query_cur, cursor->query, CURSOR_ACTION_SELECT, OG_TRUE);
query_cur->connect_data.last_level_cursor = NULL;
query_cur->cond = cursor->query->cond;
return status;
}
static inline status_t sql_make_null_hash_key(char *buf, uint32 key_count)
{
row_assist_t ra;
row_init(&ra, buf, OG_MAX_ROW_SIZE, key_count);
for (uint32 i = 0; i < key_count; ++i) {
OG_RETURN_IFERR(row_put_null(&ra));
}
return OG_SUCCESS;
}
static status_t sql_connect_mtrl_build(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_cursor_t *query_cur,
plan_node_t *plan)
{
row_assist_t ra;
bool32 found = OG_FALSE, has_null = OG_FALSE, eof = OG_FALSE;
mtrl_rowid_t rids[OG_MAX_JOIN_TABLES];
char *row_buf = NULL;
uint32 row_size;
status_t status = OG_ERROR;
cb_mtrl_plan_t *cb_mtrl_p = &plan->cb_mtrl;
OG_RETURN_IFERR(
mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_HASHMAP_RS, NULL, &CB_MTRL_CONTEXT(cursor)->hash_table_rs));
OG_RETURN_IFERR(mtrl_open_segment(&stmt->mtrl, CB_MTRL_CONTEXT(cursor)->hash_table_rs));
OG_RETURN_IFERR(sql_init_connect_mtrl_cursor(stmt, query_cur, cursor));
OG_RETURN_IFERR(SQL_CURSOR_PUSH(stmt, query_cur));
if (sql_execute_query_plan(stmt, query_cur, cb_mtrl_p->next) != OG_SUCCESS) {
SQL_CURSOR_POP(stmt);
return OG_ERROR;
}
OG_RETURN_IFERR(vmc_alloc(&cursor->vmc, sizeof(og_type_t) * cb_mtrl_p->key_exprs->count,
(void **)&CB_MTRL_CONTEXT(cursor)->key_types));
OG_RETURN_IFERR(sql_get_hash_key_types(stmt, query_cur->query, cb_mtrl_p->key_exprs, cb_mtrl_p->prior_exprs,
CB_MTRL_CONTEXT(cursor)->key_types));
if (sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&row_buf) != OG_SUCCESS) {
SQL_CURSOR_POP(stmt);
return OG_ERROR;
}
OGSQL_SAVE_STACK(stmt);
for (;;) {
OG_BREAK_IF_ERROR(sql_fetch_query(stmt, query_cur, cb_mtrl_p->next, &eof));
if (eof) {
status = OG_SUCCESS;
break;
}
OG_BREAK_IF_ERROR(
sql_make_connect_mtrl_rs_row(stmt, query_cur, CB_MTRL_CONTEXT(cursor), rids, OG_MAX_JOIN_TABLES));
OG_BREAK_IF_ERROR(
sql_make_hash_key(stmt, &ra, row_buf, cb_mtrl_p->key_exprs, CB_MTRL_CONTEXT(cursor)->key_types, &has_null));
if (has_null) {
OG_BREAK_IF_ERROR(sql_make_null_hash_key(row_buf, cb_mtrl_p->key_exprs->count));
}
OG_BREAK_IF_ERROR(sql_mtrl_row_append_data(row_buf, &row_size, (const char *)rids,
cb_mtrl_p->rs_tables->count * sizeof(mtrl_rowid_t)));
OG_BREAK_IF_ERROR(vm_hash_table_insert(&found, CB_MTRL_SEGMENT, CB_MTRL_TABLE_ENTRY, row_buf, row_size));
CB_MTRL_CONTEXT(cursor)->empty = OG_FALSE;
OGSQL_RESTORE_STACK(stmt);
}
OGSQL_RESTORE_STACK(stmt);
OGSQL_POP(stmt);
SQL_CURSOR_POP(stmt);
mtrl_close_segment(&stmt->mtrl, CB_MTRL_CONTEXT(cursor)->hash_table_rs);
return status;
}
static status_t sql_connect_mtrl_init_cursor(sql_stmt_t *stmt, sql_cursor_t *cursor)
{
if (cursor->exec_data.join != NULL) {
return OG_SUCCESS;
}
uint32 table_count = CB_MTRL_CONTEXT(cursor)->cb_mtrl_p->rs_tables->count;
mtrl_row_t *row = NULL;
sql_table_cursor_t *tab_cur = NULL;
cursor->last_table = OG_INVALID_ID32;
OG_RETURN_IFERR(vmc_alloc(&cursor->vmc, sizeof(row_addr_t) * table_count, (void **)&cursor->exec_data.join));
for (uint32 i = 0; i < table_count; ++i) {
tab_cur = &cursor->tables[i];
if (OG_IS_SUBSELECT_TABLE(tab_cur->table->type)) {
row = &tab_cur->sql_cur->mtrl.cursor.row;
sql_init_row_addr(stmt, cursor, &row->data, row->offsets, row->lens, NULL, NULL, i);
sql_open_select_cursor(stmt, tab_cur->sql_cur, tab_cur->sql_cur->plan->select_p.rs_columns);
} else if (tab_cur->table->remote_type == REMOTE_TYPE_LOCAL) {
sql_init_row_addr(stmt, cursor, (char **)&tab_cur->knl_cur->row, tab_cur->knl_cur->offsets,
tab_cur->knl_cur->lens, &tab_cur->knl_cur->rowid, NULL, i);
} else {
knl_panic(0);
}
}
return OG_SUCCESS;
}
static status_t sql_connect_mtrl_get_cursor(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_cursor_t **dst_cursor)
{
if (*dst_cursor != NULL) {
return OG_SUCCESS;
}
OG_RETURN_IFERR(sql_alloc_cursor(stmt, dst_cursor));
(*dst_cursor)->eof = OG_TRUE;
(*dst_cursor)->scn = cursor->scn;
(*dst_cursor)->ancestor_ref = cursor->ancestor_ref;
(*dst_cursor)->connect_data.last_level_cursor = cursor->connect_data.first_level_cursor;
(*dst_cursor)->connect_data.next_level_cursor = NULL;
(*dst_cursor)->connect_data.first_level_cursor = cursor->connect_data.first_level_cursor;
OG_RETURN_IFERR(sql_open_cursors(stmt, *dst_cursor, cursor->query, CURSOR_ACTION_SELECT, OG_TRUE));
return sql_connect_mtrl_init_cursor(stmt, *dst_cursor);
}
static inline void sql_connect_mtrl_init_next_data(sql_cursor_t *cursor, cb_mtrl_data_t *level_data)
{
cb_mtrl_data_t *first_level_data = sql_connect_mtrl_get_cb_data(cursor, 1);
sql_init_hash_iter(&level_data->iter, CB_MTRL_NEXT_CURSOR);
level_data->iter.hash_table = first_level_data->iter.hash_table;
level_data->iter.scan_mode = HASH_KEY_SCAN;
level_data->level_entry.vmid = OG_INVALID_ID32;
level_data->prior_row = g_invalid_entry;
}
static inline status_t sql_connect_mtrl_build_cursor(sql_cursor_t *cursor, sql_cursor_t *dst_cursor,
hash_entry_t *entry)
{
bool32 level_eof = OG_FALSE;
CB_MTRL_TEMP_ITER->callback_ctx = dst_cursor;
CB_MTRL_TEMP_ITER->curr_match = *entry;
return vm_hash_table_fetch(&level_eof, CB_MTRL_SEGMENT, CB_MTRL_TABLE_ENTRY, CB_MTRL_TEMP_ITER);
}
static status_t sql_connect_mtrl_make_prior_row(sql_stmt_t *stmt, sql_cursor_t *dst_cursor, char *buf, row_assist_t *ra)
{
galist_t *prior_exprs = dst_cursor->connect_data.first_level_cursor->connect_data.prior_exprs;
expr_node_t *node = NULL;
status_t status;
variant_t value;
OG_RETURN_IFERR(SQL_CURSOR_PUSH(stmt, dst_cursor));
row_init(ra, buf, OG_MAX_ROW_SIZE, prior_exprs->count);
for (uint32 i = 0; i < prior_exprs->count; i++) {
node = (expr_node_t *)cm_galist_get(prior_exprs, i);
status = sql_exec_expr_node(stmt, node, &value);
OG_BREAK_IF_ERROR(status);
if (OG_IS_LOB_TYPE(value.type)) {
status = sql_get_lob_value(stmt, &value);
OG_BREAK_IF_ERROR(status);
}
status = sql_put_row_value(stmt, NULL, ra, value.type, &value);
OG_BREAK_IF_ERROR(status);
}
SQL_CURSOR_POP(stmt);
return status;
}
static status_t sql_connect_mtrl_insert_prior_row(sql_stmt_t *stmt, sql_cursor_t *dst_cursor, uint32 level)
{
mtrl_rowid_t *prior_row = sql_connect_mtrl_get_prior_row(dst_cursor, level);
char *buf = NULL;
row_assist_t ra;
OGSQL_SAVE_STACK(stmt);
OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buf));
if (sql_connect_mtrl_make_prior_row(stmt, dst_cursor, buf, &ra) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
return OG_ERROR;
}
if (IS_VALID_MTRL_ROWID(*prior_row)) {
if (vmctx_free(GET_VM_CTX(stmt), prior_row) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
return OG_ERROR;
}
*prior_row = g_invalid_entry;
}
status_t status = vmctx_insert(GET_VM_CTX(stmt), (const char *)buf, ra.head->size, prior_row);
OGSQL_RESTORE_STACK(stmt);
return status;
}
static inline void sql_connect_mtrl_close_vmctx_page(sql_stmt_t *stmt, mtrl_rowid_t *mtrl_rowid)
{
if (mtrl_rowid->vmid != OG_INVALID_ID32) {
vmctx_close_row_id(GET_VM_CTX(stmt), mtrl_rowid);
}
}
static status_t sql_connect_mtrl_check_iscycle(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_cursor_t *dst_cursor,
bool32 *is_cycle)
{
char *lbuf = NULL;
char *rbuf = NULL;
uint32 level = sql_connect_mtrl_get_level(cursor);
row_assist_t ra;
mtrl_rowid_t cur_mtrl_rowid = g_invalid_entry;
vm_page_t *opened_page = NULL;
OGSQL_SAVE_STACK(stmt);
OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&lbuf));
if (sql_connect_mtrl_make_prior_row(stmt, dst_cursor, lbuf, &ra) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
return OG_ERROR;
}
for (uint32 i = level - 1; i > 0; i--) {
mtrl_rowid_t *rowid = sql_connect_mtrl_get_prior_row(cursor, i);
if (cur_mtrl_rowid.vmid != rowid->vmid) {
sql_connect_mtrl_close_vmctx_page(stmt, &cur_mtrl_rowid);
cur_mtrl_rowid.vmid = rowid->vmid;
if (vm_open(GET_VM_CTX(stmt)->session, GET_VM_CTX(stmt)->pool, rowid->vmid, &opened_page) != OG_SUCCESS) {
OG_THROW_ERROR_EX(ERR_VM, "failed to open row id vm id %u, vm slot %u", rowid->vmid, rowid->slot);
OGSQL_RESTORE_STACK(stmt);
return OG_ERROR;
}
}
rbuf = VMCTX_GET_DATA(opened_page, rowid);
*is_cycle = cm_row_equal(lbuf, rbuf);
if (*is_cycle) {
if (!cursor->query->connect_by_nocycle) {
sql_connect_mtrl_close_vmctx_page(stmt, &cur_mtrl_rowid);
OG_THROW_ERROR(ERR_CONNECT_BY_LOOP);
OGSQL_RESTORE_STACK(stmt);
return OG_ERROR;
}
break;
}
}
sql_connect_mtrl_close_vmctx_page(stmt, &cur_mtrl_rowid);
OGSQL_RESTORE_STACK(stmt);
return OG_SUCCESS;
}
static inline status_t sql_connect_mtrl_fetch_data(sql_stmt_t *stmt, sql_cursor_t *cursor, hash_table_iter_t *iter,
bool32 *result, bool32 *eof)
{
*result = OG_FALSE;
*eof = OG_FALSE;
OG_RETURN_IFERR(vm_hash_table_fetch(eof, CB_MTRL_SEGMENT, CB_MTRL_TABLE_ENTRY, iter));
if (*eof || CB_MTRL_PLAN->connect_by_cond == NULL) {
return OG_SUCCESS;
}
return sql_match_cond_node(stmt, CB_MTRL_PLAN->connect_by_cond->root, result);
}
static inline void sql_connect_mtrl_init_next_cursor(sql_cursor_t *cursor, sql_cursor_t *curr_cursor, uint32 level)
{
curr_cursor->connect_data.next_level_cursor = CB_MTRL_NEXT_CURSOR;
CB_MTRL_NEXT_CURSOR->connect_data.last_level_cursor = curr_cursor;
CB_MTRL_NEXT_CURSOR->connect_data.level = level;
CB_MTRL_NEXT_CURSOR->rownum = curr_cursor->rownum + 1;
}
static status_t sql_connect_mtrl_get_next_level(sql_stmt_t *stmt, sql_cursor_t *cursor, uint32 level, bool32 *eof,
bool32 *last_is_cycle)
{
bool32 result = OG_FALSE;
bool32 is_cycle = OG_FALSE;
cb_mtrl_data_t *cb_mtrl_data = sql_connect_mtrl_get_cb_data(cursor, level);
hash_table_iter_t *iter_next = sql_connect_mtrl_get_iter(cursor, level);
OG_RETURN_IFERR(sql_connect_mtrl_get_first_entry(stmt, cursor, iter_next));
do {
cb_mtrl_data->level_entry = iter_next->curr_match;
OG_RETURN_IFERR(sql_connect_mtrl_fetch_data(stmt, cursor, iter_next, &result, eof));
if (*eof) {
return OG_SUCCESS;
}
if (!result) {
continue;
}
OG_RETURN_IFERR(sql_connect_mtrl_check_iscycle(stmt, cursor, CB_MTRL_NEXT_CURSOR, &is_cycle));
if (!is_cycle) {
break;
}
*last_is_cycle = OG_TRUE;
} while (1);
return OG_SUCCESS;
}
static status_t sql_connect_mtrl_execute_next_level(sql_stmt_t *stmt, sql_cursor_t *cursor, uint32 level)
{
cb_mtrl_data_t *cb_mtrl_data = NULL;
bool32 eof = OG_FALSE;
hash_table_iter_t *iter_next = NULL;
bool32 is_cycle = OG_FALSE;
sql_cursor_t *curr_cursor = cursor->connect_data.cur_level_cursor;
OG_RETURN_IFERR(sql_connect_mtrl_get_cursor(stmt, cursor, &CB_MTRL_NEXT_CURSOR));
sql_connect_mtrl_init_next_cursor(cursor, curr_cursor, level);
sql_connect_mtrl_add_level(cursor);
OG_RETURN_IFERR(sql_connect_mtrl_alloc_cb_data(cursor, level, &cb_mtrl_data));
sql_connect_mtrl_init_next_data(cursor, cb_mtrl_data);
iter_next = &(cb_mtrl_data->iter);
iter_next->callback_ctx = CB_MTRL_NEXT_CURSOR;
OG_RETURN_IFERR(sql_connect_mtrl_insert_prior_row(stmt, curr_cursor, level - 1));
OG_RETURN_IFERR(sql_connect_mtrl_push_cursor(stmt, CB_MTRL_NEXT_CURSOR));
if (sql_connect_mtrl_get_next_level(stmt, cursor, level, &eof, &is_cycle) != OG_SUCCESS) {
sql_connect_mtrl_pop_cursor(stmt, CB_MTRL_NEXT_CURSOR);
return OG_ERROR;
}
sql_connect_mtrl_pop_cursor(stmt, CB_MTRL_NEXT_CURSOR);
if (is_cycle) {
curr_cursor->connect_data.connect_by_iscycle = OG_TRUE;
}
if (eof) {
OG_RETURN_IFERR(sql_connect_mtrl_delete_level(stmt, cursor, level));
curr_cursor->connect_data.next_level_cursor = NULL;
curr_cursor->connect_data.connect_by_isleaf = OG_TRUE;
return OG_SUCCESS;
}
iter_next->curr_match = cb_mtrl_data->level_entry;
cb_mtrl_data->level_entry.vmid = OG_INVALID_ID32;
curr_cursor->connect_data.connect_by_isleaf = OG_FALSE;
return OG_SUCCESS;
}
static status_t sql_connect_mtrl_open_first_cursor(sql_stmt_t *stmt, sql_cursor_t *cursor)
{
bool32 found = OG_FALSE;
hash_scan_assist_t scan_assist = { HASH_FULL_SCAN, NULL, 0 };
cb_mtrl_data_t *cb_mtrl_data = NULL;
hash_table_iter_t *iter = NULL;
OG_RETURN_IFERR(sql_connect_mtrl_init_cursor(stmt, cursor));
sql_connect_mtrl_add_level(cursor);
OG_RETURN_IFERR(sql_connect_mtrl_alloc_cb_data(cursor, 1, &cb_mtrl_data));
cb_mtrl_data->level_entry.vmid = OG_INVALID_ID32;
cb_mtrl_data->prior_row = g_invalid_entry;
iter = &(cb_mtrl_data->iter);
sql_init_hash_iter(iter, cursor);
OG_RETURN_IFERR(vm_hash_table_open(CB_MTRL_SEGMENT, CB_MTRL_TABLE_ENTRY, &scan_assist, &found, iter));
MEMS_RETURN_IFERR(memcpy_sp(CB_MTRL_TEMP_ITER, sizeof(hash_table_iter_t), iter, sizeof(hash_table_iter_t)));
CB_MTRL_TEMP_ITER->scan_mode = HASH_KEY_SCAN;
return OG_SUCCESS;
}
static inline status_t sql_connect_mtrl_open_cursor(sql_stmt_t *stmt, sql_cursor_t *cursor)
{
if (cursor->connect_data.cur_level_cursor == NULL) {
return sql_connect_mtrl_open_first_cursor(stmt, cursor);
}
return sql_connect_mtrl_execute_next_level(stmt, cursor, CB_MTRL_SECOND_LEVEL);
}
status_t sql_execute_connect_mtrl(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
sql_cursor_t *query_cur = NULL;
if (cursor->cb_mtrl_ctx == NULL) {
OG_RETURN_IFERR(sql_alloc_connect_mtrl_ctx(stmt, cursor, plan));
OG_RETURN_IFERR(sql_alloc_cursor(stmt, &query_cur));
if (sql_connect_mtrl_build(stmt, cursor, query_cur, plan) != OG_SUCCESS) {
sql_free_cursor(stmt, query_cur);
return OG_ERROR;
}
sql_free_cursor(stmt, query_cur);
if (cursor->connect_data.cur_level_cursor != NULL) {
OG_RETURN_IFERR(sql_connect_mtrl_open_first_cursor(stmt, cursor));
}
}
return sql_connect_mtrl_open_cursor(stmt, cursor);
}
static status_t sql_connect_mtrl_fetch_first_cursor(sql_stmt_t *stmt, sql_cursor_t *cursor, bool32 *eof)
{
bool32 result = OG_FALSE;
hash_table_iter_t *iter = sql_connect_mtrl_get_iter(cursor, 1);
if (CB_MTRL_CONTEXT(cursor)->empty) {
*eof = OG_TRUE;
return OG_SUCCESS;
}
for (;;) {
if (vm_hash_table_fetch(eof, CB_MTRL_SEGMENT, CB_MTRL_TABLE_ENTRY, iter) != OG_SUCCESS) {
iter->curr_match.vmid = OG_INVALID_ID32;
return OG_ERROR;
}
if (*eof || CB_MTRL_PLAN->start_with_cond == NULL) {
break;
}
OG_RETURN_IFERR(sql_match_cond_node(stmt, CB_MTRL_PLAN->start_with_cond->root, &result));
if (result) {
break;
}
}
return OG_SUCCESS;
}
static inline status_t sql_make_connect_mtrl_scan_key(sql_stmt_t *stmt, sql_cursor_t *cursor, cb_mtrl_plan_t *cb_mtrl,
char *buf, bool32 *has_null)
{
row_assist_t ra;
return sql_make_hash_key(stmt, &ra, buf, cb_mtrl->prior_exprs, CB_MTRL_CONTEXT(cursor)->key_types, has_null);
}
static status_t sql_connect_mtrl_build_last_cursor(sql_stmt_t *stmt, sql_cursor_t *cursor, uint32 level)
{
if (level == CB_MTRL_SECOND_LEVEL) {
return OG_SUCCESS;
}
cb_mtrl_data_t *cb_data = sql_connect_mtrl_get_cb_data(cursor, level - 1);
hash_entry_t *entry = &cb_data->level_entry;
OG_RETURN_IFERR(sql_connect_mtrl_get_cursor(stmt, cursor, &CB_MTRL_LAST_CURSOR));
return sql_connect_mtrl_build_cursor(cursor, CB_MTRL_LAST_CURSOR, entry);
}
static status_t sql_connect_mtrl_fetch_curr_cursor(sql_stmt_t *stmt, sql_cursor_t *cursor, uint32 level, bool32 *eof)
{
bool32 is_cycle = OG_FALSE;
bool32 result = OG_FALSE;
hash_entry_t *entry = &(sql_connect_mtrl_get_cb_data(cursor, level))->level_entry;
hash_entry_t tmp_entry = *entry;
hash_table_iter_t *iter = sql_connect_mtrl_get_iter(cursor, level);
iter->callback_ctx = CB_MTRL_CURR_CURSOR;
OG_RETURN_IFERR(sql_connect_mtrl_push_cursor(stmt, CB_MTRL_CURR_CURSOR));
do {
*entry = iter->curr_match;
if (sql_connect_mtrl_fetch_data(stmt, cursor, iter, &result, eof) != OG_SUCCESS) {
sql_connect_mtrl_pop_cursor(stmt, CB_MTRL_CURR_CURSOR);
return OG_ERROR;
}
if (*eof) {
sql_connect_mtrl_pop_cursor(stmt, CB_MTRL_CURR_CURSOR);
return OG_SUCCESS;
}
if (!result) {
continue;
}
if (tmp_entry.vmid != OG_INVALID_ID32 &&
sql_connect_mtrl_check_iscycle(stmt, cursor, CB_MTRL_CURR_CURSOR, &is_cycle) != OG_SUCCESS) {
sql_connect_mtrl_pop_cursor(stmt, CB_MTRL_CURR_CURSOR);
return OG_ERROR;
}
if (!is_cycle) {
break;
}
} while (1);
sql_connect_mtrl_pop_cursor(stmt, CB_MTRL_CURR_CURSOR);
return OG_SUCCESS;
}
static status_t sql_connect_mtrl_set_iscycle(sql_stmt_t *stmt, sql_cursor_t *cursor, uint32 level)
{
sql_cursor_t *curr_cursor = cursor->connect_data.cur_level_cursor;
if (curr_cursor->connect_data.connect_by_isleaf || curr_cursor->connect_data.connect_by_iscycle) {
return OG_SUCCESS;
}
bool32 is_cycle = OG_FALSE;
bool32 result = OG_FALSE;
bool32 level_eof = OG_FALSE;
OG_RETURN_IFERR(sql_connect_mtrl_get_cursor(stmt, cursor, &CB_MTRL_NEXT_CURSOR));
sql_connect_mtrl_init_next_cursor(cursor, curr_cursor, level);
cb_mtrl_data_t *cb_data = sql_connect_mtrl_get_cb_data(cursor, level);
hash_table_iter_t *iter_next = &cb_data->iter;
hash_table_iter_t iter_save = *iter_next;
iter_next->callback_ctx = CB_MTRL_NEXT_CURSOR;
OG_RETURN_IFERR(sql_connect_mtrl_push_cursor(stmt, CB_MTRL_NEXT_CURSOR));
if (vm_hash_table_fetch(&level_eof, CB_MTRL_SEGMENT, CB_MTRL_TABLE_ENTRY, iter_next) != OG_SUCCESS) {
sql_connect_mtrl_pop_cursor(stmt, CB_MTRL_NEXT_CURSOR);
return OG_ERROR;
}
do {
if (sql_connect_mtrl_fetch_data(stmt, cursor, iter_next, &result, &level_eof) != OG_SUCCESS) {
sql_connect_mtrl_pop_cursor(stmt, CB_MTRL_NEXT_CURSOR);
return OG_ERROR;
}
if (level_eof) {
break;
}
if (!result) {
continue;
}
if (sql_connect_mtrl_check_iscycle(stmt, cursor, CB_MTRL_NEXT_CURSOR, &is_cycle) != OG_SUCCESS) {
sql_connect_mtrl_pop_cursor(stmt, CB_MTRL_NEXT_CURSOR);
return OG_ERROR;
}
if (is_cycle) {
curr_cursor->connect_data.connect_by_iscycle = OG_TRUE;
break;
}
} while (1);
cb_data->iter = iter_save;
sql_connect_mtrl_pop_cursor(stmt, CB_MTRL_NEXT_CURSOR);
return OG_SUCCESS;
}
static inline void sql_connect_mtrl_init_curr_cursor(sql_stmt_t *stmt, sql_cursor_t *cursor, uint32 level)
{
sql_cursor_t *last_cursor = (level > CB_MTRL_SECOND_LEVEL) ? CB_MTRL_LAST_CURSOR : cursor;
last_cursor->connect_data.next_level_cursor = CB_MTRL_CURR_CURSOR;
CB_MTRL_CURR_CURSOR->connect_data.last_level_cursor = last_cursor;
CB_MTRL_CURR_CURSOR->connect_data.connect_by_isleaf = OG_FALSE;
CB_MTRL_CURR_CURSOR->connect_data.connect_by_iscycle = OG_FALSE;
CB_MTRL_CURR_CURSOR->connect_data.level = level;
CB_MTRL_CURR_CURSOR->rownum = cursor->rownum;
}
static status_t sql_connect_mtrl_get_curr_data(sql_stmt_t *stmt, sql_cursor_t *cursor, uint32 level, bool32 *level_eof)
{
if (sql_connect_mtrl_get_iter(cursor, level)->curr_match.vmid == OG_INVALID_ID32) {
*level_eof = OG_TRUE;
return OG_SUCCESS;
}
if (CB_MTRL_CURR_CURSOR->connect_data.level != level || !CB_MTRL_CURR_CURSOR->connect_data.connect_by_isleaf) {
OG_RETURN_IFERR(sql_connect_mtrl_build_last_cursor(stmt, cursor, level));
}
cursor->connect_data.cur_level_cursor = NULL;
sql_connect_mtrl_init_curr_cursor(stmt, cursor, level);
return sql_connect_mtrl_fetch_curr_cursor(stmt, cursor, level, level_eof);
}
static inline void sql_connect_mtrl_reset_path(sql_cursor_t *cursor)
{
for (uint32 i = 0; i < cursor->connect_data.path_func_nodes->count; i++) {
cm_pop(cursor->connect_data.path_stack + i);
}
}
static status_t sql_connect_mtrl_fetch_next_level(sql_stmt_t *stmt, sql_cursor_t *cursor, uint32 level, bool32 *eof)
{
bool32 level_eof = OG_FALSE;
if (sql_stack_safe(stmt) != OG_SUCCESS) {
return OG_ERROR;
}
if (sql_connect_mtrl_get_curr_data(stmt, cursor, level, &level_eof) != OG_SUCCESS) {
return OG_ERROR;
}
if (!level_eof) {
cursor->connect_data.cur_level_cursor = CB_MTRL_CURR_CURSOR;
if (sql_connect_mtrl_execute_next_level(stmt, cursor, level + 1) != OG_SUCCESS) {
return OG_ERROR;
}
if (cursor->query->connect_by_iscycle && cursor->query->connect_by_prior) {
if (sql_connect_mtrl_set_iscycle(stmt, cursor, level + 1) != OG_SUCCESS) {
return OG_ERROR;
}
}
} else {
if (sql_connect_mtrl_delete_level(stmt, cursor, level) != OG_SUCCESS) {
return OG_ERROR;
}
sql_connect_mtrl_reset_path(cursor);
if (level == CB_MTRL_SECOND_LEVEL) {
*eof = OG_TRUE;
} else if (level > CB_MTRL_SECOND_LEVEL) {
return sql_connect_mtrl_fetch_next_level(stmt, cursor, level - 1, eof);
}
}
return OG_SUCCESS;
}
static status_t sql_connect_mtrl_get_first_entry(sql_stmt_t *stmt, sql_cursor_t *cursor, hash_table_iter_t *iter)
{
char *key_buf = NULL;
bool32 found = OG_FALSE;
bool32 has_null = OG_FALSE;
hash_scan_assist_t scan_assist;
OGSQL_SAVE_STACK(stmt);
OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&key_buf));
if (sql_make_connect_mtrl_scan_key(stmt, cursor, CB_MTRL_PLAN, key_buf, &has_null) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
return OG_ERROR;
}
if (has_null) {
OGSQL_RESTORE_STACK(stmt);
iter->curr_match.vmid = OG_INVALID_ID32;
return OG_SUCCESS;
}
scan_assist.scan_mode = HASH_KEY_SCAN;
scan_assist.buf = key_buf;
scan_assist.size = ((row_head_t *)key_buf)->size;
if (vm_hash_table_open(CB_MTRL_SEGMENT, CB_MTRL_TABLE_ENTRY, &scan_assist, &found, iter) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
return OG_ERROR;
}
if (!found) {
iter->curr_match.vmid = OG_INVALID_ID32;
}
OGSQL_RESTORE_STACK(stmt);
return OG_SUCCESS;
}
status_t sql_fetch_connect_mtrl(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, bool32 *eof)
{
status_t status;
CM_TRACE_BEGIN;
uint32 level = sql_connect_mtrl_get_level(cursor);
if (level == 1) {
status = sql_connect_mtrl_fetch_first_cursor(stmt, cursor, eof);
} else {
OG_RETURN_IFERR(sql_connect_mtrl_get_cursor(stmt, cursor, &CB_MTRL_CURR_CURSOR));
status = sql_connect_mtrl_fetch_next_level(stmt, cursor, level, eof);
}
CM_TRACE_END(stmt, plan->plan_id);
return status;
}