* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* dml_executor.h
*
*
* IDENTIFICATION
* src/ogsql/executor/dml_executor.h
*
* -------------------------------------------------------------------------
*/
#ifndef __DML_EXECUTOR_H__
#define __DML_EXECUTOR_H__
#include "plan_range.h"
#include "ogsql_plan.h"
#include "ogsql_stmt.h"
#include "ogsql_cond.h"
#include "cm_bilist.h"
#include "ogsql_vm_hash_table.h"
#include "ogsql_table_func.h"
#include "ogsql_btree.h"
typedef enum en_key_set_type {
KEY_SET_FULL,
KEY_SET_EMPTY,
KEY_SET_NORMAL,
} key_set_type_t;
typedef struct st_key_range_t {
char *l_key;
char *r_key;
bool32 is_equal;
} key_range_t;
typedef struct st_key_set_t {
void *key_data;
uint32 offset;
key_set_type_t type;
} key_set_t;
#ifdef OG_RAC_ING
typedef struct st_participant_node {
uint16 stmt_index;
bool8 eof;
uint8 reserved[1];
struct st_participant_node *next;
} participant_node_t;
typedef struct st_merge_group_property {
participant_node_t need_fetch_head;
participant_node_t head;
uint16 need_fetch_stmt_index[M_SLOT_STMT_LIST_COUNT];
uint16 stmt_index_count;
participant_node_t *pnode;
} merge_group_property_t;
static inline participant_node_t *sql_remote_get_merge_group_participant_list(sql_stmt_t *ogsql_stmt,
merge_group_property_t *mergegrp_property)
{
if (mergegrp_property->pnode == NULL) {
OG_THROW_ERROR(ERR_ASSERT_ERROR, "mergegrp_property->pnode == NULL");
}
return mergegrp_property->pnode;
}
typedef struct st_merge_fetch_property {
uint16 participants_count;
uint16 alive_count;
char *buf;
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* | participant_node_t array | loser tree(i.e., int32 array) |
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
*
* refer to sql_remote_get_merge_sort_losertree() and sql_remote_get_merge_sort_participant() for details
*/
} merge_fetch_property_t;
static inline int32 *sql_remote_get_merge_sort_losertree(sql_stmt_t *ogsql_stmt, merge_fetch_property_t
*mergesort_property)
{
if (mergesort_property->buf == NULL) {
OG_THROW_ERROR_EX(ERR_ASSERT_ERROR, "mergesort_property->buf == NULL");
}
return (int32 *)(mergesort_property->buf + mergesort_property->participants_count * sizeof(participant_node_t));
}
typedef struct st_remote_cursor {
bool32 eof;
remote_fetcher_type_t method_type;
struct {
union {
merge_fetch_property_t merge_sort_property;
merge_group_property_t merge_group_property;
};
};
uint16 stmt_count;
uint16 current_stmt_index;
node_slot_stmt_t *slot_stmt_list[M_SLOT_STMT_LIST_COUNT];
group_list_t group_id_list;
bool32 direct_fetch;
bool32 is_group_changed;
uint32 column_count;
row_head_t *row;
uint16 *offsets;
uint16 *lens;
rowid_t rowid;
bool8 is_mtrl_data;
uint16 rownodeid;
char buf[0];
} remote_cursor_t;
#endif
typedef struct st_mps_knlcur {
knl_cursor_t *knl_cursor;
uint32 offset;
} mps_knlcur_t;
typedef struct st_mps_sort {
uint32 count;
uint32 sort_array_length;
uint32 *sort_array;
} mps_sort_t;
typedef struct st_multi_parts_scan_ctx {
galist_t *knlcur_list;
mps_sort_t *sort_info;
uint32 knlcur_id;
bool32 stop_index_key;
} mps_ctx_t;
typedef struct st_sql_table_cursor {
sql_table_t *table;
union {
knl_cursor_t *knl_cur;
struct st_sql_cursor *sql_cur;
};
knl_cursor_action_t action;
knl_scan_mode_t scan_mode;
union {
struct {
key_set_t key_set;
key_set_t part_set;
part_scan_key_t curr_part;
part_scan_key_t curr_subpart;
uint32 part_scan_index;
mps_ctx_t multi_parts_info;
};
json_table_exec_t json_table_exec;
};
bool32 hash_table;
tf_scan_flag_t scan_flag;
struct st_par_scan_range range;
uint64 scn;
vmc_t vmc;
} sql_table_cursor_t;
#define PENDING_HEAD_SIZE sizeof(uint32)
typedef struct st_mtrl_sibl_sort {
uint32 sid;
uint32 cursor_sid;
} mtrl_sibl_sort_t;
typedef struct st_sql_mtrl_handler {
mtrl_cursor_t cursor;
mtrl_resource_t rs;
mtrl_resource_t predicate;
mtrl_resource_t query_block;
mtrl_resource_t outline;
mtrl_resource_t note;
mtrl_resource_t sort;
uint32 aggr;
uint32 aggr_str;
uint32 sort_seg;
uint32 for_update;
mtrl_resource_t group;
uint32 group_index;
uint32 distinct;
uint32 index_distinct;
bool32 aggr_fetched;
mtrl_resource_t winsort_rs;
mtrl_resource_t winsort_aggr;
mtrl_resource_t winsort_aggr_ext;
mtrl_resource_t winsort_sort;
uint32 hash_table_rs;
mtrl_savepoint_t save_point;
mtrl_sibl_sort_t sibl_sort;
} sql_mtrl_handler_t;
typedef struct st_union_all_data {
uint32 pos;
} union_all_data_t;
typedef struct st_minus_data {
bool32 r_continue_fetch;
uint32 rs_vmid;
uint32 rnums;
} minus_data_t;
typedef struct st_limit_data {
uint64 fetch_row_count;
uint64 limit_count;
uint64 limit_offset;
} limit_data_t;
typedef struct st_connect_by_data {
struct st_sql_cursor *next_level_cursor;
struct st_sql_cursor *last_level_cursor;
struct st_sql_cursor *first_level_cursor;
struct st_sql_cursor *cur_level_cursor;
bool32 connect_by_isleaf;
bool32 connect_by_iscycle;
uint32 level;
uint32 first_level_rownum;
galist_t *path_func_nodes;
galist_t *prior_exprs;
cm_stack_t *path_stack;
} connect_by_data_t;
typedef struct st_join_data {
struct st_sql_cursor *left;
struct st_sql_cursor *right;
} join_data_t;
typedef struct st_group_data {
uint32 curr_group;
uint32 group_count;
group_plan_t *group_p;
} group_data_t;
typedef struct st_cube_data {
struct st_sql_cursor *fetch_cursor;
struct st_sql_cursor *group_cursor;
galist_t *sets;
galist_t *nodes;
galist_t *plans;
biqueue_t curs_que;
cube_node_t **maps;
plan_node_t *fetch_plan;
} cube_data_t;
typedef struct st_hash_join_anchor {
uint32 slot;
uint32 batch_cnt;
bool32 eof;
} hash_join_anchor_t;
typedef struct st_merge_group_data {
bool32 eof;
} merge_group_data_t;
typedef struct st_outer_join_data {
bool8 need_reset_right;
bool8 right_matched;
bool8 need_swap_driver;
bool8 left_empty;
plan_node_t *right_plan;
plan_node_t *left_plan;
cond_tree_t *cond;
cond_tree_t *filter;
struct st_nl_full_opt_ctx *nl_full_opt_ctx;
} outer_join_data_t;
typedef struct st_inner_join_data {
bool32 right_fetched;
} inner_join_data_t;
typedef struct st_nl_batch_data {
bool32 last_batch;
struct st_sql_cursor *cache_cur;
} nl_batch_data_t;
typedef struct st_hash_right_semi_data {
uint32 total_rows;
uint32 deleted_rows;
bool32 is_first;
} hash_right_semi_t;
typedef struct st_plan_exec_data {
limit_data_t *query_limit;
limit_data_t *select_limit;
union_all_data_t *union_all;
minus_data_t minus;
uint32 unpivot_row;
uint32 *expl_col_max_size;
uint32 *qb_col_max_size;
outer_join_data_t *outer_join;
inner_join_data_t *inner_join;
row_addr_t *join;
char *aggr_dis;
char *select_view;
char *tab_parallel;
group_data_t *group;
cube_data_t *group_cube;
nl_batch_data_t *nl_batch;
galist_t *index_scan_range_ar;
galist_t *part_scan_range_ar;
text_buf_t sort_concat;
knl_cursor_t *ext_knl_cur;
hash_right_semi_t *right_semi;
char *dv_plan_buf;
} plan_exec_data_t;
typedef enum e_hash_table_oper_type {
OPER_TYPE_INSERT = 1,
OPER_TYPE_FETCH = 2
} hash_table_opertype_t;
typedef enum en_group_type {
HASH_GROUP_TYPE,
SORT_GROUP_TYPE,
HASH_GROUP_PIVOT_TYPE,
HASH_GROUP_PAR_TYPE,
} group_type_t;
typedef struct st_concate_ctx {
uint32 id;
uint32 vmid;
char *buf;
galist_t *keys;
galist_t *sub_plans;
plan_node_t *curr_plan;
hash_segment_t hash_segment;
hash_table_entry_t hash_table;
hash_table_iter_t iter;
} concate_ctx_t;
typedef enum en_group_by_phase {
GROUP_BY_INIT,
GROUP_BY_PARALLEL,
GROUP_BY_COLLECT,
GROUP_BY_END
} group_by_phase_t;
typedef struct st_group_ctx {
group_type_t type;
hash_table_opertype_t oper_type;
sql_stmt_t *stmt;
group_plan_t *group_p;
expr_node_t **aggr_node;
handle_t cursor;
uint32 vm_id;
uint32 listagg_page;
text_buf_t concat_data;
char **concat_typebuf;
mtrl_segment_t extra_data;
uint32 *str_aggr_pages;
uint32 str_aggr_page_count;
group_by_phase_t group_by_phase;
uint32 key_card;
variant_t *str_aggr_val;
hash_segment_t hash_segment;
hash_table_entry_t *hash_dist_tables;
union {
struct {
bool32 empty;
struct {
hash_segment_t *hash_segment_par;
bool32 *empty_par;
uint32 par_hash_tab_count;
};
hash_table_entry_t group_hash_table;
hash_scan_assist_t group_hash_scan_assit;
hash_table_entry_t *hash_tables;
hash_table_iter_t *iters;
};
struct {
sql_btree_segment_t btree_seg;
sql_btree_cursor_t btree_cursor;
};
};
char *row_buf;
char *aggr_buf;
uint32 row_buf_len;
uint32 aggr_buf_len;
} group_ctx_t;
typedef enum en_distinct_type {
HASH_DISTINCT,
SORT_DISTINCT,
HASH_UNION
} distinct_type_t;
typedef struct st_distinct_ctx {
distinct_type_t type;
distinct_plan_t *distinct_p;
union {
struct {
hash_segment_t hash_segment;
hash_table_entry_t hash_table_entry;
};
struct {
sql_btree_segment_t btree_seg;
sql_btree_cursor_t btree_cursor;
};
};
} distinct_ctx_t;
typedef struct st_unpivot_ctx {
uint32 row_buf_len;
char *row_buf;
} unpivot_ctx_t;
typedef struct st_hash_mtrl_ctx {
group_ctx_t group_ctx;
char *aggrs;
uint32 aggr_id;
bool32 fetched;
og_type_t *key_types;
} hash_mtrl_ctx_t;
typedef struct st_connect_by_mtrl_data {
hash_entry_t level_entry;
hash_table_iter_t iter;
mtrl_rowid_t prior_row;
} cb_mtrl_data_t;
typedef struct st_connect_by_mtrl_ctx {
cb_mtrl_plan_t *cb_mtrl_p;
uint32 vmid;
bool32 empty;
uint32 curr_level;
uint32 hash_table_rs;
hash_segment_t hash_segment;
hash_table_entry_t hash_table;
hash_table_iter_t iter;
galist_t *cb_data;
sql_cursor_t *last_cursor;
sql_cursor_t *curr_cursor;
sql_cursor_t *next_cursor;
og_type_t *key_types;
} cb_mtrl_ctx_t;
typedef struct st_query_mtrl_ctx {
withas_mtrl_plan_t *withas_p;
mtrl_resource_t rs;
bool32 is_ready;
} withas_mtrl_ctx_t;
typedef struct st_vm_view_mtrl_ctx {
vm_view_mtrl_plan_t *vm_view_p;
mtrl_resource_t rs;
bool32 is_ready;
} vm_view_mtrl_ctx_t;
typedef struct st_hash_view_ctx {
bool8 initialized;
bool8 unavailable;
bool8 has_null_key;
bool8 empty_table;
uint32 key_count;
og_type_t types[SQL_MAX_HASH_OPTM_KEYS];
hash_segment_t hash_seg;
hash_table_entry_t hash_table;
} hash_view_ctx_t;
typedef struct st_found_rows_info {
uint64 offset_skipcount;
uint64 limit_skipcount;
} found_rows_info_t;
typedef struct st_hash_join_ctx {
bool32 right_eof : 1;
bool32 has_match : 1;
bool32 need_match_cond : 1;
bool32 need_swap_driver : 1;
bool32 scan_hash_table : 1;
bool32 unused : 27;
bool32 is_find;
cond_tree_t *join_cond;
mtrl_context_t *mtrl_ctx;
og_type_t *key_types;
hash_table_iter_t iter;
} hash_join_ctx_t;
typedef struct st_merge_into_hash_data {
bool32 already_update;
} merge_into_hash_data_t;
typedef struct st_semi_flag {
bool32 flag;
bool32 eof;
} semi_flag_t;
typedef struct st_semi_anchor {
semi_flag_t semi_flags[OG_MAX_JOIN_TABLES];
} semi_anchor_t;
typedef struct st_hash_material {
struct st_sql_cursor *hj_tables[OG_MAX_JOIN_TABLES];
} hash_material_t;
typedef struct st_nl_full_opt_ctx {
uint32 id;
hash_segment_t hash_seg;
hash_entry_t hash_table_entry;
hash_table_iter_t iter;
} nl_full_opt_ctx_t;
typedef struct st_sql_par_mgr sql_par_mgr_t;
typedef struct st_sql_cursor_par_ctx {
sql_par_mgr_t *par_mgr;
bool32 par_threads_inuse : 1;
bool32 par_need_gather : 1;
volatile bool32 par_fetch_st : 2;
bool32 par_exe_flag : 1;
uint32 par_parallel : 8;
uint32 unused : 19;
} sql_cursor_par_ctx_t;
typedef enum st_hash_table_status {
HASH_TABLE_STATUS_NOINIT = 0,
HASH_TABLE_STATUS_CREATE = 1,
HASH_TABLE_STATUS_CLONE = 2,
} hash_table_status_t;
typedef struct st_idx_func_cache_t {
expr_node_t *node;
uint16 tab;
uint16 col;
} idx_func_cache_t;
typedef struct st_sql_cursor {
sql_stmt_t *stmt;
plan_node_t *plan;
union {
sql_merge_t *merge_ctx;
sql_update_t *update_ctx;
sql_delete_t *delete_ctx;
sql_insert_t *insert_ctx;
sql_select_t *select_ctx;
};
cond_tree_t *cond;
sql_query_t *query;
galist_t *columns;
mtrl_page_t *aggr_page;
uint32 total_rows;
uint32 rownum;
uint32 max_rownum;
struct st_sql_cursor *prev;
struct st_sql_cursor *next;
struct st_sql_cursor *cursor_maps[OG_MAX_SUBSELECT_EXPRS];
biqueue_t ssa_cursors;
uint32 last_table;
uint32 table_count;
uint32 id_maps[OG_MAX_JOIN_TABLES];
sql_table_cursor_t *tables;
uint64 scn;
sql_mtrl_handler_t mtrl;
merge_into_hash_data_t merge_into_hash;
connect_by_data_t connect_data;
struct st_sql_cursor *left_cursor;
struct st_sql_cursor *right_cursor;
struct st_sql_cursor *ancestor_ref;
join_data_t *m_join;
found_rows_info_t found_rows;
plan_exec_data_t exec_data;
vmc_t vmc;
hash_segment_t hash_seg;
hash_entry_t hash_table_entry;
hash_join_ctx_t *hash_join_ctx;
union {
group_ctx_t *group_ctx;
group_ctx_t *pivot_ctx;
};
galist_t *nl_full_ctx_list;
unpivot_ctx_t *unpivot_ctx;
hash_mtrl_ctx_t *hash_mtrl_ctx;
hash_material_t hash_mtrl;
semi_anchor_t semi_anchor;
concate_ctx_t *cnct_ctx;
uint32 not_cache : 1;
uint32 is_open : 1;
uint32 is_result_cached : 1;
uint32 exists_result : 1;
uint32 winsort_ready : 1;
uint32 global_cached : 1;
uint32 hash_table_status : 2;
bool32 is_mtrl_cursor : 1;
uint32 is_group_insert : 1;
uint32 reserved : 22;
bool32 eof;
sql_cursor_par_ctx_t par_ctx;
distinct_ctx_t *distinct_ctx;
cb_mtrl_ctx_t *cb_mtrl_ctx;
galist_t *idx_func_cache;
} sql_cursor_t;
#ifdef __cplusplus
extern "C" {
#endif
status_t sql_begin_dml(sql_stmt_t *ogsql_stmt);
status_t sql_try_execute_dml(sql_stmt_t *ogsql_stmt);
status_t sql_execute_single_dml(sql_stmt_t *ogsql_stmt, knl_savepoint_t *savepoint);
status_t sql_execute_fetch(sql_stmt_t *ogsql_stmt);
status_t sql_execute_fetch_medatata(sql_stmt_t *ogsql_stmt);
status_t sql_execute_fetch_cursor_medatata(sql_stmt_t *ogsql_stmt);
status_t sql_alloc_cursor(sql_stmt_t *ogsql_stmt, sql_cursor_t **cursor);
status_t sql_alloc_knl_cursor(sql_stmt_t *ogsql_stmt, knl_cursor_t **cursor);
void sql_close_cursor(sql_stmt_t *ogsql_stmt, sql_cursor_t *ogsql_cursor);
void sql_free_cursor(sql_stmt_t *ogsql_stmt, sql_cursor_t *ogsql_cursor);
status_t sql_make_result_set(sql_stmt_t *stmt, sql_cursor_t *cursor);
void sql_init_varea_set(sql_stmt_t *ogsql_stmt, sql_table_cursor_t *table_cursor);
void sql_free_varea_set(sql_table_cursor_t *table_cursor);
void sql_init_sql_cursor(sql_stmt_t *stmt, sql_cursor_t *ogsql_cursor);
void sql_free_cursors(sql_stmt_t *ogsql_stmt);
status_t sql_alloc_global_sql_cursor(object_t **object);
void sql_free_va_set(sql_stmt_t *ogsql_stmt, sql_cursor_t *ogsql_cursor);
status_t sql_parse_anonymous_soft(sql_stmt_t *ogsql_stmt, word_t *leader, sql_text_t *sql);
status_t sql_parse_anonymous_directly(sql_stmt_t *ogsql_stmt, word_t *leader, sql_text_t *sql);
void sql_free_merge_join_data(sql_stmt_t *ogsql_stmt, join_data_t *m_join);
void sql_free_knl_cursor(sql_stmt_t *ogsql_stmt, knl_cursor_t *ogsql_cursor);
og_type_t sql_get_pending_type(char *pending_buf, uint32 id);
status_t sql_try_put_dml_batch_error(sql_stmt_t *ogsql_stmt, uint32 row, int32 error_code, const char *message);
void sql_release_json_table(sql_table_cursor_t *tab_cursor);
void sql_free_nl_full_opt_ctx(nl_full_opt_ctx_t *opt_ctx);
static inline sql_cursor_t *sql_get_proj_cursor(sql_cursor_t *cursor)
{
return cursor;
}
static inline void sql_inc_rows(sql_stmt_t *ogsql_stmt, sql_cursor_t *cursor)
{
ogsql_stmt->batch_rows++;
cursor->total_rows++;
}
static inline void sql_cursor_cache_result(sql_cursor_t *cursor, bool32 result)
{
cursor->exists_result = result;
cursor->is_result_cached = OG_TRUE;
}
* Get the upper of rownum from a *cond_tree_t*, if the cond_tree is
* null, then return OG_INFINITE32.
*/
#define GET_MAX_ROWNUM(cond) (((cond) != NULL) ? (cond)->rownum_upper : OG_INFINITE32)
static inline status_t sql_get_ancestor_cursor(sql_cursor_t *curr_cur, uint32 ancestor, sql_cursor_t **ancestor_cur)
{
uint32 depth = 0;
*ancestor_cur = curr_cur;
if (curr_cur == NULL) {
OG_THROW_ERROR(ERR_VALUE_ERROR, "no sql prepare cannot get column value");
return OG_ERROR;
}
while (depth < (ancestor)) {
if ((*ancestor_cur)->ancestor_ref == NULL) {
OG_THROW_ERROR(ERR_ANCESTOR_LEVEL_MISMATCH);
return OG_ERROR;
}
(*ancestor_cur) = (*ancestor_cur)->ancestor_ref;
depth++;
}
return OG_SUCCESS;
}
static inline status_t sql_alloc_ssa_cursor(sql_cursor_t *cursor, sql_select_t *select_ctx, uint32 id,
sql_cursor_t **sql_cur)
{
OG_RETURN_IFERR(sql_alloc_cursor(cursor->stmt, sql_cur));
(*sql_cur)->select_ctx = select_ctx;
(*sql_cur)->plan = select_ctx->plan;
(*sql_cur)->scn = cursor->scn;
(*sql_cur)->ancestor_ref = cursor;
(*sql_cur)->global_cached = OG_TRUE;
cursor->cursor_maps[id] = *sql_cur;
biqueue_add_tail(&cursor->ssa_cursors, QUEUE_NODE_OF(*sql_cur));
return OG_SUCCESS;
}
static inline status_t sql_get_ssa_cursor(sql_cursor_t *cursor, sql_select_t *select_ctx, uint32 id,
sql_cursor_t **sql_cur)
{
*sql_cur = cursor->cursor_maps[id];
if (*sql_cur != NULL) {
return OG_SUCCESS;
}
return sql_alloc_ssa_cursor(cursor, select_ctx, id, sql_cur);
}
static inline void sql_init_ssa_cursor_maps(sql_cursor_t *cursor, uint32 ssa_count)
{
for (uint32 i = 0; i < ssa_count; i++) {
cursor->cursor_maps[i] = NULL;
}
}
static inline status_t sql_alloc_table_cursors(sql_cursor_t *cursor, uint32 table_cnt)
{
return vmc_alloc_mem(&cursor->vmc, sizeof(sql_table_cursor_t) * table_cnt, (void **)&cursor->tables);
}
static inline void sql_cursor_hash_table_clone(sql_stmt_t *main_stmt, sql_cursor_t *src, sql_cursor_t *dest)
{
dest->hash_seg = src->hash_seg;
dest->hash_table_entry = src->hash_table_entry;
dest->hash_join_ctx->right_eof = OG_TRUE;
dest->hash_join_ctx->has_match = OG_FALSE;
dest->hash_join_ctx->mtrl_ctx = &main_stmt->mtrl;
dest->mtrl.hash_table_rs = src->mtrl.hash_table_rs;
}
static inline sql_cursor_t *sql_get_group_cursor(sql_cursor_t *cursor)
{
if (cursor->mtrl.cursor.type != MTRL_CURSOR_HASH_GROUP || cursor->exec_data.group_cube == NULL ||
cursor->exec_data.group_cube->fetch_cursor == NULL) {
return cursor;
}
return cursor->exec_data.group_cube->fetch_cursor;
}
void sql_release_multi_parts_resources(sql_stmt_t *ogsql_stmt, sql_table_cursor_t *tab_cur);
#ifdef __cplusplus
}
#endif
#endif