* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* ogsql_aggr.c
*
*
* IDENTIFICATION
* src/ogsql/executor/ogsql_aggr.c
*
* -------------------------------------------------------------------------
*/
#include "ogsql_aggr.h"
#include "ogsql_select.h"
#include "ogsql_proj.h"
#include "ogsql_mtrl.h"
#include "ogsql_group.h"
#include "knl_mtrl.h"
#include "ogsql_scan.h"
#include "ogsql_sort.h"
#ifdef OG_RAC_ING
#include "srv_instance.h"
#include "shd_group.h"
#endif
static status_t sql_mtrl_aggr_page_alloc(sql_stmt_t *stmt, sql_cursor_t *cursor, uint32 size, void **result);
static inline status_t sql_aggr_alloc(sql_stmt_t *stmt, sql_cursor_t *cursor, uint32 size, char **buffer)
{
mtrl_context_t *ogx = &stmt->mtrl;
mtrl_segment_t *segment = ogx->segments[cursor->mtrl.aggr];
mtrl_page_t *page = (mtrl_page_t *)segment->curr_page->data;
if (size > OG_MAX_ROW_SIZE) {
OG_THROW_ERROR(ERR_EXCEED_MAX_ROW_SIZE, size, OG_MAX_ROW_SIZE);
return OG_ERROR;
}
if (page->free_begin + size + sizeof(uint32) > OG_VMEM_PAGE_SIZE - MTRL_DIR_SIZE(page)) {
if (mtrl_extend_segment(ogx, segment) != OG_SUCCESS) {
return OG_ERROR;
}
if (mtrl_open_page(ogx, segment->vm_list.last, &segment->curr_page) != OG_SUCCESS) {
return OG_ERROR;
}
page = (mtrl_page_t *)segment->curr_page->data;
mtrl_init_page(page, segment->vm_list.last);
}
*buffer = (char *)page + page->free_begin;
page->free_begin += size;
return OG_SUCCESS;
}
static inline status_t sql_copy_aggr_value_by_string(sql_stmt_t *stmt, sql_cursor_t *cursor, variant_t *value,
aggr_var_t *result)
{
if (value->v_text.len == 0) {
return OG_SUCCESS;
}
if (value->v_text.len > OG_MAX_ROW_SIZE) {
OG_THROW_ERROR(ERR_EXCEED_MAX_ROW_SIZE, value->v_text.len, OG_MAX_ROW_SIZE);
return OG_ERROR;
}
aggr_str_t *aggr_str = GET_AGGR_VAR_STR_EX(result);
if (value->v_text.len > aggr_str->aggr_bufsize) {
aggr_str->aggr_bufsize = MAX(aggr_str->aggr_bufsize * AGGR_BUF_SIZE_FACTOR, value->v_text.len);
OG_RETURN_IFERR(sql_aggr_alloc(stmt, cursor, aggr_str->aggr_bufsize, &result->var.v_text.str));
}
MEMS_RETURN_IFERR(memcpy_s(result->var.v_text.str, aggr_str->aggr_bufsize, value->v_text.str, value->v_text.len));
result->var.v_text.len = value->v_text.len;
return OG_SUCCESS;
}
static inline status_t sql_copy_aggr_value_by_binary(sql_stmt_t *stmt, sql_cursor_t *cursor, variant_t *value,
aggr_var_t *result)
{
if (value->v_bin.size == 0) {
return OG_SUCCESS;
}
if (value->v_bin.size > OG_MAX_ROW_SIZE) {
OG_THROW_ERROR(ERR_EXCEED_MAX_ROW_SIZE, value->v_bin.size, OG_MAX_ROW_SIZE);
return OG_ERROR;
}
aggr_str_t *aggr_str = GET_AGGR_VAR_STR_EX(result);
if (aggr_str == NULL) {
OG_THROW_ERROR(ERR_ASSERT_ERROR, "aggr_str != NULL");
return OG_ERROR;
}
if (value->v_bin.size > aggr_str->aggr_bufsize) {
aggr_str->aggr_bufsize = MAX(aggr_str->aggr_bufsize * AGGR_BUF_SIZE_FACTOR, value->v_bin.size);
OG_RETURN_IFERR(sql_aggr_alloc(stmt, cursor, aggr_str->aggr_bufsize, (char **)&result->var.v_bin.bytes));
}
MEMS_RETURN_IFERR(memcpy_s(result->var.v_bin.bytes, aggr_str->aggr_bufsize, value->v_bin.bytes, value->v_bin.size));
result->var.v_bin.size = value->v_bin.size;
return OG_SUCCESS;
}
static inline status_t sql_copy_aggr_value(sql_stmt_t *stmt, sql_cursor_t *cursor, variant_t *value, aggr_var_t *result)
{
switch (result->var.type) {
case OG_TYPE_NUMBER:
case OG_TYPE_DECIMAL:
case OG_TYPE_NUMBER2:
result->var.ctrl = value->ctrl;
cm_dec_copy(&result->var.v_dec, &value->v_dec);
return OG_SUCCESS;
case OG_TYPE_UINT32:
case OG_TYPE_INTEGER:
case OG_TYPE_BIGINT:
case OG_TYPE_REAL:
case OG_TYPE_DATE:
case OG_TYPE_TIMESTAMP:
case OG_TYPE_TIMESTAMP_TZ_FAKE:
case OG_TYPE_TIMESTAMP_TZ:
case OG_TYPE_TIMESTAMP_LTZ:
case OG_TYPE_INTERVAL_DS:
case OG_TYPE_INTERVAL_YM:
case OG_TYPE_BOOLEAN:
var_copy(value, &result->var);
break;
case OG_TYPE_CHAR:
case OG_TYPE_VARCHAR:
case OG_TYPE_STRING:
return sql_copy_aggr_value_by_string(stmt, cursor, value, result);
case OG_TYPE_BINARY:
case OG_TYPE_VARBINARY:
case OG_TYPE_RAW:
return sql_copy_aggr_value_by_binary(stmt, cursor, value, result);
default:
OG_SET_ERROR_MISMATCH_EX(result->var.type);
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t sql_aggr_get_value_from_vm(sql_stmt_t *stmt, sql_cursor_t *cursor, aggr_var_t *aggr_var,
bool32 keep_old_open)
{
aggr_str_t *aggr_str = GET_AGGR_VAR_STR_EX(aggr_var);
mtrl_rowid_t *rid = &aggr_str->str_result;
mtrl_segment_t *segment = stmt->mtrl.segments[cursor->mtrl.aggr_str];
mtrl_page_t *page = (mtrl_page_t *)segment->curr_page->data;
if (rid->vmid != OG_INVALID_ID32) {
if (page != NULL && page->id != rid->vmid) {
if (!keep_old_open) {
mtrl_close_page(&stmt->mtrl, page->id);
segment->curr_page = NULL;
}
page = NULL;
}
if (page == NULL) {
OG_RETURN_IFERR(mtrl_open_page(&stmt->mtrl, rid->vmid, &segment->curr_page));
page = (mtrl_page_t *)segment->curr_page->data;
}
aggr_var->var.v_text.str = MTRL_GET_ROW(page, rid->slot) + sizeof(row_head_t) + sizeof(uint16);
}
return OG_SUCCESS;
}
static status_t sql_aggr_ensure_str_buf(sql_stmt_t *stmt, sql_cursor_t *cursor, aggr_var_t *aggr_var,
uint32 ensure_size, bool32 keep_value)
{
char *buf = NULL;
mtrl_rowid_t rid;
uint32 rsv_size;
uint32 row_size;
if (ensure_size > OG_MAX_ROW_SIZE) {
OG_THROW_ERROR(ERR_EXCEED_MAX_ROW_SIZE, ensure_size, OG_MAX_ROW_SIZE);
return OG_ERROR;
}
aggr_str_t *aggr_str = GET_AGGR_VAR_STR_EX(aggr_var);
if (ensure_size <= aggr_str->aggr_bufsize) {
return OG_SUCCESS;
}
if (cursor->mtrl.aggr_str == OG_INVALID_ID32) {
OG_RETURN_IFERR(mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_EXTRA_DATA, NULL, &cursor->mtrl.aggr_str));
OG_RETURN_IFERR(mtrl_open_segment(&stmt->mtrl, cursor->mtrl.aggr_str));
}
rsv_size = MAX(SQL_GROUP_CONCAT_STR_LEN, aggr_str->aggr_bufsize * AGGR_BUF_SIZE_FACTOR);
rsv_size = MAX(rsv_size, ensure_size);
rsv_size = MIN(rsv_size, OG_MAX_ROW_SIZE);
OG_RETURN_IFERR(sql_push(stmt, (uint32)(OG_MAX_ROW_SIZE + sizeof(row_head_t) + sizeof(uint16)), (void **)&buf));
row_head_t *head = (row_head_t *)buf;
head->flags = 0;
head->itl_id = OG_INVALID_ID8;
head->column_count = (uint16)1;
row_size = rsv_size + sizeof(row_head_t) + sizeof(uint16);
head->size = row_size;
if (mtrl_insert_row(&stmt->mtrl, cursor->mtrl.aggr_str, buf, &rid) != OG_SUCCESS) {
OGSQL_POP(stmt);
return OG_ERROR;
}
OGSQL_POP(stmt);
if (keep_value && !aggr_var->var.is_null && aggr_var->var.v_text.len > 0) {
uint32 old_vmid = aggr_str->str_result.vmid;
OG_RETURN_IFERR(sql_aggr_get_value_from_vm(stmt, cursor, aggr_var, OG_FALSE));
variant_t old_var = aggr_var->var;
aggr_str->str_result = rid;
OG_RETURN_IFERR(sql_aggr_get_value_from_vm(stmt, cursor, aggr_var, OG_TRUE));
MEMS_RETURN_IFERR(memcpy_s(aggr_var->var.v_text.str, rsv_size, old_var.v_text.str, old_var.v_text.len));
if (old_vmid != rid.vmid && old_vmid != OG_INVALID_ID32) {
mtrl_close_page(&stmt->mtrl, old_vmid);
}
} else {
aggr_str->str_result = rid;
}
aggr_str->aggr_bufsize = rsv_size;
return OG_SUCCESS;
}
static status_t sql_aggr_save_aggr_str_value(sql_stmt_t *stmt, sql_cursor_t *cursor, aggr_var_t *aggr_var,
variant_t *value)
{
if (value->is_null) {
return OG_SUCCESS;
}
if (value->v_text.len != 0) {
aggr_str_t *aggr_str = GET_AGGR_VAR_STR_EX(aggr_var);
OG_RETURN_IFERR(sql_aggr_ensure_str_buf(stmt, cursor, aggr_var, value->v_text.len, OG_FALSE));
OG_RETURN_IFERR(sql_aggr_get_value_from_vm(stmt, cursor, aggr_var, OG_FALSE));
MEMS_RETURN_IFERR(
memcpy_s(aggr_var->var.v_text.str, aggr_str->aggr_bufsize, value->v_text.str, value->v_text.len));
}
aggr_var->var.v_text.len = value->v_text.len;
aggr_var->var.is_null = OG_FALSE;
return OG_SUCCESS;
}
static inline status_t sql_aggr_none(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var, variant_t *value)
{
OG_THROW_ERROR(ERR_UNKNOWN_ARRG_OPER);
return OG_ERROR;
}
static inline status_t sql_aggr_count(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var, variant_t *value)
{
VALUE(int64, &aggr_var->var) += VALUE(int64, value);
return OG_SUCCESS;
}
static inline status_t sql_aggr_cume_dist(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var, variant_t *value)
{
GET_AGGR_VAR_AVG(aggr_var)->ex_avg_count += ogsql_stmt->avg_count;
if (SECUREC_LIKELY(!aggr_var->var.is_null)) {
return sql_aggr_sum_value(ogsql_stmt->stmt, &aggr_var->var, value);
}
var_copy(value, &aggr_var->var);
return OG_SUCCESS;
}
static inline status_t sql_aggr_sum(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var, variant_t *value)
{
if (SECUREC_LIKELY(!aggr_var->var.is_null)) {
return sql_aggr_sum_value(ogsql_stmt->stmt, &aggr_var->var, value);
}
if (value->type != aggr_var->var.type) {
if (aggr_var->var.type != OG_TYPE_UNKNOWN) {
OG_RETURN_IFERR(var_convert(SESSION_NLS(ogsql_stmt->stmt), value, aggr_var->var.type, NULL));
}
}
var_copy(value, &aggr_var->var);
return OG_SUCCESS;
}
static inline status_t sql_aggr_avg(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var, variant_t *value)
{
GET_AGGR_VAR_AVG(aggr_var)->ex_avg_count += ogsql_stmt->avg_count;
return sql_aggr_sum(ogsql_stmt, aggr_var, value);
}
static status_t sql_aggr_min_max(aggr_assist_t *ass, aggr_var_t *var, variant_t *value)
{
if (value->type != var->var.type) {
if (var->var.type == OG_TYPE_UNKNOWN) {
var->var.type = value->type;
if (OG_IS_VARLEN_TYPE(var->var.type)) {
aggr_str_t *aggr_str = NULL;
OG_RETURN_IFERR(sql_mtrl_aggr_page_alloc(ass->stmt, ass->cursor, sizeof(aggr_str_t), (void
**)&aggr_str));
var->extra_offset = (uint32)((char *)aggr_str - (char *)var);
var->extra_size = sizeof(aggr_str_t);
aggr_str->aggr_bufsize = 0;
aggr_str->str_result.vmid = OG_INVALID_ID32;
aggr_str->str_result.slot = OG_INVALID_ID32;
}
} else {
OG_RETURN_IFERR(sql_convert_variant(ass->stmt, value, var->var.type));
}
}
int32 cmp_result;
if (var->var.is_null) {
var->var.is_null = OG_FALSE;
return sql_copy_aggr_value(ass->stmt, ass->cursor, value, var);
}
OG_RETURN_IFERR(sql_compare_variant(ass->stmt, &var->var, value, &cmp_result));
if ((ass->aggr_type == AGGR_TYPE_MIN && cmp_result > 0) || (ass->aggr_type == AGGR_TYPE_MAX && cmp_result < 0)) {
return sql_copy_aggr_value(ass->stmt, ass->cursor, value, var);
}
return OG_SUCCESS;
}
static status_t sql_aggr_listagg_sort(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var, variant_t *value)
{
status_t status = OG_ERROR;
char *buf = NULL;
row_assist_t ra;
uint32 seg_id = ogsql_stmt->cursor->mtrl.sort_seg;
if (value->is_null) {
return OG_SUCCESS;
}
OGSQL_SAVE_STACK(ogsql_stmt->stmt);
sql_keep_stack_variant(ogsql_stmt->stmt, value);
if (sql_push(ogsql_stmt->stmt, OG_MAX_ROW_SIZE, (void **)&buf) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(ogsql_stmt->stmt);
return OG_ERROR;
}
row_init(&ra, buf, OG_MAX_ROW_SIZE, ogsql_stmt->aggr_node->sort_items->count + 1);
do {
aggr_group_concat_t *aggr_group = GET_AGGR_VAR_GROUPCONCAT(aggr_var);
char* type_buf = aggr_group->type_buf;
OG_BREAK_IF_ERROR(sql_hash_group_make_sort_row(ogsql_stmt->stmt, ogsql_stmt->aggr_node, &ra,
type_buf, value, ogsql_stmt->cursor));
if (aggr_group->total_len != 0 && !aggr_group->extra.is_null) {
aggr_group->total_len += aggr_group->extra.v_text.len;
}
aggr_group->total_len += value->v_text.len;
if (aggr_group->total_len > OG_MAX_ROW_SIZE) {
OG_THROW_ERROR(ERR_EXCEED_MAX_ROW_SIZE, aggr_group->total_len, OG_MAX_ROW_SIZE);
break;
}
status = sql_hash_group_mtrl_insert_row(ogsql_stmt->stmt, seg_id, ogsql_stmt->aggr_node, aggr_var, buf);
aggr_var->var.is_null = OG_FALSE;
} while (0);
OGSQL_RESTORE_STACK(ogsql_stmt->stmt);
return status;
}
static status_t sql_aggr_listagg(aggr_assist_t *ass, aggr_var_t *aggr_var, variant_t *val)
{
if (ass->aggr_node->sort_items != NULL) {
return sql_aggr_listagg_sort(ass, aggr_var, val);
}
status_t status = OG_ERROR;
aggr_group_concat_t *aggr_group = GET_AGGR_VAR_GROUPCONCAT(aggr_var);
variant_t *sep_var = &aggr_group->extra;
bool32 has_sep = (bool32)(!sep_var->is_null && sep_var->v_text.len > 0);
if (val->is_null) {
return OG_SUCCESS;
}
OGSQL_SAVE_STACK(ass->stmt);
sql_keep_stack_variant(ass->stmt, val);
do {
if (aggr_var->var.is_null) {
status = sql_aggr_save_aggr_str_value(ass->stmt, ass->cursor, aggr_var, val);
break;
}
uint32 len = aggr_var->var.v_text.len + val->v_text.len;
if (has_sep) {
len += sep_var->v_text.len;
}
OG_BREAK_IF_ERROR(sql_aggr_ensure_str_buf(ass->stmt, ass->cursor, aggr_var, len, OG_TRUE));
OG_BREAK_IF_ERROR(sql_aggr_get_value_from_vm(ass->stmt, ass->cursor, aggr_var, OG_FALSE));
char *cur_buffer = aggr_var->var.v_text.str + aggr_var->var.v_text.len;
uint32 remain_len = len - aggr_var->var.v_text.len;
if (has_sep) {
MEMS_RETURN_IFERR(memcpy_sp(cur_buffer, remain_len, sep_var->v_text.str, sep_var->v_text.len));
cur_buffer += sep_var->v_text.len;
remain_len -= sep_var->v_text.len;
}
if (val->v_text.len != 0) {
MEMS_RETURN_IFERR(memcpy_sp(cur_buffer, remain_len, val->v_text.str, val->v_text.len));
}
status = OG_SUCCESS;
aggr_var->var.v_text.len = len;
} while (0);
OGSQL_RESTORE_STACK(ass->stmt);
return status;
}
static status_t sql_get_compare_res4rank(sql_stmt_t *sql_statement, sort_item_t *sort_item, variant_t *cst_var,
variant_t *sort_var, int32 *compare_res)
{
bool32 nvl_first = sort_item->nulls_pos == SORT_NULLS_FIRST || sort_item->nulls_pos == SORT_NULLS_DEFAULT;
if (cst_var->is_null && sort_var->is_null) {
*compare_res = 0;
return OG_SUCCESS;
}
if (cst_var->is_null) {
*compare_res = nvl_first ? -1 : 1;
return OG_SUCCESS;
}
if (sort_var->is_null) {
*compare_res = -1;
if (nvl_first) {
*compare_res = 1;
}
return OG_SUCCESS;
}
status_t status;
if (!OG_IS_NUMERIC_TYPE(sort_var->type)) {
status = sql_convert_variant(sql_statement, cst_var, sort_var->type);
if (status != OG_SUCCESS) {
OG_LOG_RUN_ERR("func:%s revoke func:%s failed.", "sql_get_compare_res4rank", "sql_convert_variant");
return status;
}
sql_keep_stack_variant(sql_statement, cst_var);
}
status = sql_compare_variant(sql_statement, cst_var, sort_var, compare_res);
if (status != OG_SUCCESS) {
OG_LOG_RUN_ERR("func:%s revoke func:%s failed.", "sql_get_compare_res4rank", "sql_compare_variant");
return status;
}
if (sort_item->direction == SORT_MODE_DESC) {
*compare_res *= -1;
}
return OG_SUCCESS;
}
status_t sql_compare_sort_row_for_rank(sql_stmt_t *ogsql_stmt, expr_node_t *aggr_node, bool32 *flag)
{
int32 cmp_result = 0;
variant_t sort_var;
variant_t constant;
sort_item_t *sort_item = NULL;
expr_tree_t *arg = NULL;
arg = aggr_node->argument;
OGSQL_SAVE_STACK(ogsql_stmt);
for (uint32 i = 0; i < aggr_node->sort_items->count; ++i) {
OGSQL_RESTORE_STACK(ogsql_stmt);
sort_item = (sort_item_t *)cm_galist_get(aggr_node->sort_items, i);
OG_RETURN_IFERR(sql_exec_expr(ogsql_stmt, sort_item->expr, &sort_var));
sql_keep_stack_variant(ogsql_stmt, &sort_var);
OG_RETURN_IFERR(sql_exec_expr(ogsql_stmt, arg, &constant));
sql_keep_stack_variant(ogsql_stmt, &constant);
OG_RETURN_IFERR(sql_get_compare_res4rank(ogsql_stmt, sort_item, &constant, &sort_var, &cmp_result));
if (cmp_result < 0) {
*flag = OG_FALSE;
return OG_SUCCESS;
} else if (cmp_result > 0) {
return OG_SUCCESS;
} else {
arg = arg->next;
}
}
*flag = OG_FALSE;
return OG_SUCCESS;
}
static status_t sql_aggr_make_sort_row(sql_stmt_t *stmt, expr_node_t *aggr_node, row_assist_t *ra)
{
uint32 i;
variant_t sort_var;
sort_item_t *sort_item = NULL;
for (i = 0; i < aggr_node->sort_items->count; ++i) {
sort_item = (sort_item_t *)cm_galist_get(aggr_node->sort_items, i);
OG_RETURN_IFERR(sql_exec_expr(stmt, sort_item->expr, &sort_var));
OG_RETURN_IFERR(sql_put_row_value(stmt, NULL, ra, sort_var.type, &sort_var));
}
return OG_SUCCESS;
}
static status_t sql_aggr_init_hash_tbl(aggr_assist_t *aggr_ass, aggr_var_t *aggr_var,
aggr_dense_rank_t *aggr_dense_rank)
{
CM_POINTER3(aggr_ass, aggr_var, aggr_dense_rank);
aggr_var->var.is_null = OG_FALSE;
vm_hash_segment_init(KNL_SESSION(aggr_ass->stmt), aggr_ass->stmt->mtrl.pool,
&aggr_dense_rank->hash_segment, PMA_POOL, HASH_PAGES_HOLD, HASH_AREA_SIZE);
OG_RETURN_IFERR(vm_hash_table_alloc(&aggr_dense_rank->table_entry, &aggr_dense_rank->hash_segment, 0));
return vm_hash_table_init(&aggr_dense_rank->hash_segment, &aggr_dense_rank->table_entry,
NULL, NULL, NULL);
}
static status_t sql_aggr_dense_rank(aggr_assist_t *aggr_ast, aggr_var_t *aggr_var, variant_t *var)
{
char *row_buf = NULL;
row_assist_t row_asst = {0};
bool32 match_found = OG_FALSE;
aggr_dense_rank_t *aggr_dense_rank = GET_AGGR_VAR_DENSE_RANK(aggr_var);
if (aggr_var->var.is_null) {
OG_RETURN_IFERR(sql_aggr_init_hash_tbl(aggr_ast, aggr_var, aggr_dense_rank));
}
OG_RETURN_IFERR(ogsql_func_rank(aggr_ast->stmt, aggr_ast->aggr_node, var));
OG_RETSUC_IFTRUE(var->v_int == 0);
OGSQL_SAVE_STACK(aggr_ast->stmt);
OG_RETURN_IFERR(sql_push(aggr_ast->stmt, OG_MAX_ROW_SIZE, (void **)&row_buf));
row_init(&row_asst, row_buf, OG_MAX_ROW_SIZE, aggr_ast->aggr_node->sort_items->count);
status_t ret = OG_ERROR;
do {
OG_BREAK_IF_ERROR(sql_aggr_make_sort_row(aggr_ast->stmt, aggr_ast->aggr_node, &row_asst));
OG_BREAK_IF_ERROR(vm_hash_table_insert2(&match_found, &aggr_dense_rank->hash_segment,
&aggr_dense_rank->table_entry, row_buf, row_asst.head->size));
ret = OG_SUCCESS;
} while (OG_FALSE);
if (ret != OG_SUCCESS) {
OGSQL_RESTORE_STACK(aggr_ast->stmt);
return OG_ERROR;
}
if (match_found == OG_FALSE) {
VALUE(uint32, &aggr_var->var) += VALUE(uint32, var);
}
OGSQL_RESTORE_STACK(aggr_ast->stmt);
return OG_SUCCESS;
}
static status_t sql_aggr_rank(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_val, variant_t *var)
{
aggr_val->var.is_null = OG_FALSE;
VALUE(uint32, &aggr_val->var) += VALUE(uint32, var);
return OG_SUCCESS;
}
static inline status_t sql_aggr_calc_group_concat_sort(sql_stmt_t *stmt, sql_cursor_t *cursor, expr_node_t *aggr_node,
aggr_var_t *aggr_var)
{
return sql_hash_group_calc_listagg(stmt, cursor, aggr_node, aggr_var, &cursor->exec_data.sort_concat);
}
static status_t sql_aggr_median(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var, variant_t *value)
{
status_t status = OG_ERROR;
char *buf = NULL;
row_assist_t ra;
uint32 seg_id = ogsql_stmt->cursor->mtrl.sort_seg;
og_type_t type = TREE_DATATYPE(ogsql_stmt->aggr_node->argument);
GET_AGGR_VAR_MEDIAN(aggr_var)->median_count += ogsql_stmt->avg_count;
if (value->is_null) {
return OG_SUCCESS;
}
if (aggr_var->var.is_null) {
if (!OG_IS_NUMERIC_TYPE(value->type) && !OG_IS_DATETIME_TYPE(value->type)) {
OG_THROW_ERROR(ERR_TYPE_MISMATCH, "NUMERIC OR DATETIME", get_datatype_name_str(value->type));
return OG_ERROR;
}
var_copy(value, &aggr_var->var);
}
if (type == OG_TYPE_UNKNOWN) {
type = aggr_var->var.type;
}
OGSQL_SAVE_STACK(ogsql_stmt->stmt);
sql_keep_stack_variant(ogsql_stmt->stmt, value);
if (sql_push(ogsql_stmt->stmt, OG_MAX_ROW_SIZE, (void **)&buf) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(ogsql_stmt->stmt);
return OG_ERROR;
}
row_init(&ra, buf, OG_MAX_ROW_SIZE, 1);
do {
OG_BREAK_IF_ERROR(sql_put_row_value(ogsql_stmt->stmt, NULL, &ra, type, value));
status = sql_hash_group_mtrl_insert_row(ogsql_stmt->stmt, seg_id, ogsql_stmt->aggr_node, aggr_var, buf);
} while (0);
OGSQL_RESTORE_STACK(ogsql_stmt->stmt);
return status;
}
static status_t sql_aggr_stddev(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var, variant_t *value)
{
aggr_stddev_t *aggr_stddev = GET_AGGR_VAR_STDDEV(aggr_var);
if (aggr_var->var.is_null == OG_TRUE) {
aggr_var->var.is_null = OG_FALSE;
aggr_var->var.type = OG_TYPE_NUMBER;
cm_zero_dec(&aggr_var->var.v_dec);
aggr_stddev->extra.is_null = OG_FALSE;
aggr_stddev->extra.type = OG_TYPE_NUMBER;
cm_zero_dec(&aggr_stddev->extra.v_dec);
}
variant_t tmp_square_var;
tmp_square_var.is_null = OG_FALSE;
tmp_square_var.type = OG_TYPE_NUMBER;
OG_RETURN_IFERR(opr_exec(OPER_TYPE_MUL, SESSION_NLS(ogsql_stmt->stmt), value, value,
&tmp_square_var));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_ADD, SESSION_NLS(ogsql_stmt->stmt), &aggr_var->var, &tmp_square_var,
&aggr_var->var));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_ADD, SESSION_NLS(ogsql_stmt->stmt), &aggr_stddev->extra, value,
&aggr_stddev->extra));
aggr_stddev->ex_count++;
return OG_SUCCESS;
}
static status_t sql_aggr_corr(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var, variant_t *values)
{
sql_stmt_t *stmt = ogsql_stmt->stmt;
variant_t *value = &values[0];
variant_t *value_extra = &values[1];
variant_t tmp_square_var;
tmp_square_var.is_null = OG_FALSE;
tmp_square_var.type = OG_TYPE_NUMBER;
aggr_corr_t *a_corr = GET_AGGR_VAR_CORR(aggr_var);
if (aggr_var->var.is_null == OG_TRUE) {
aggr_var->var.is_null = OG_FALSE;
aggr_var->var.type = OG_TYPE_NUMBER;
a_corr->extra[CORR_VAR_SUM_X].is_null = OG_FALSE;
a_corr->extra[CORR_VAR_SUM_Y].is_null = OG_FALSE;
a_corr->extra[CORR_VAR_SUM_XX].is_null = OG_FALSE;
a_corr->extra[CORR_VAR_SUM_YY].is_null = OG_FALSE;
a_corr->extra[CORR_VAR_SUM_X].type = OG_TYPE_NUMBER;
a_corr->extra[CORR_VAR_SUM_Y].type = OG_TYPE_NUMBER;
a_corr->extra[CORR_VAR_SUM_XX].type = OG_TYPE_NUMBER;
a_corr->extra[CORR_VAR_SUM_YY].type = OG_TYPE_NUMBER;
cm_zero_dec(&aggr_var->var.v_dec);
cm_zero_dec(&a_corr->extra[CORR_VAR_SUM_X].v_dec);
cm_zero_dec(&a_corr->extra[CORR_VAR_SUM_Y].v_dec);
cm_zero_dec(&a_corr->extra[CORR_VAR_SUM_XX].v_dec);
cm_zero_dec(&a_corr->extra[CORR_VAR_SUM_YY].v_dec);
}
if (value->is_null || value_extra->is_null) {
return OG_SUCCESS;
}
OG_RETURN_IFERR(opr_exec(OPER_TYPE_MUL, SESSION_NLS(stmt), value, value_extra, &tmp_square_var));
OG_RETURN_IFERR(
opr_exec(OPER_TYPE_ADD, SESSION_NLS(stmt), &aggr_var->var, &tmp_square_var, &aggr_var->var));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_ADD, SESSION_NLS(stmt), &a_corr->extra[CORR_VAR_SUM_X], value,
&a_corr->extra[CORR_VAR_SUM_X]));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_ADD, SESSION_NLS(stmt), &a_corr->extra[CORR_VAR_SUM_Y], value_extra,
&a_corr->extra[CORR_VAR_SUM_Y]));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_MUL, SESSION_NLS(stmt), value, value, &tmp_square_var));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_ADD, SESSION_NLS(stmt), &a_corr->extra[CORR_VAR_SUM_XX], &tmp_square_var,
&a_corr->extra[CORR_VAR_SUM_XX]));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_MUL, SESSION_NLS(stmt), value_extra, value_extra, &tmp_square_var));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_ADD, SESSION_NLS(stmt), &a_corr->extra[CORR_VAR_SUM_YY], &tmp_square_var,
&a_corr->extra[CORR_VAR_SUM_YY]));
a_corr->ex_count++;
return OG_SUCCESS;
}
static status_t sql_aggr_covar(aggr_assist_t *ass, aggr_var_t *v_aggr, variant_t *values)
{
sql_stmt_t *stmt = ass->stmt;
variant_t *value = &values[0];
variant_t *value_extra = &values[1];
variant_t tmp_square_var;
tmp_square_var.is_null = OG_FALSE;
tmp_square_var.type = OG_TYPE_NUMBER;
aggr_covar_t *aggr_covar = GET_AGGR_VAR_COVAR(v_aggr);
if (value->is_null || value_extra->is_null) {
return OG_SUCCESS;
}
if (v_aggr->var.is_null == OG_TRUE) {
v_aggr->var.is_null = OG_FALSE;
v_aggr->var.type = OG_TYPE_NUMBER;
aggr_covar->extra.is_null = OG_FALSE;
aggr_covar->extra.type = OG_TYPE_NUMBER;
aggr_covar->extra_1.is_null = OG_FALSE;
aggr_covar->extra_1.type = OG_TYPE_NUMBER;
cm_zero_dec(&v_aggr->var.v_dec);
cm_zero_dec(&aggr_covar->extra.v_dec);
cm_zero_dec(&aggr_covar->extra_1.v_dec);
}
OG_RETURN_IFERR(opr_exec(OPER_TYPE_MUL, SESSION_NLS(stmt), value, value_extra,
&tmp_square_var));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_ADD, SESSION_NLS(stmt), &v_aggr->var, &tmp_square_var,
&v_aggr->var));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_ADD, SESSION_NLS(stmt), &aggr_covar->extra, value,
&aggr_covar->extra));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_ADD, SESSION_NLS(stmt), &aggr_covar->extra_1, value_extra,
&aggr_covar->extra_1));
aggr_covar->ex_count++;
return OG_SUCCESS;
}
static status_t sql_aggr_array_agg(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var, variant_t *value)
{
var_array_t *varray = &aggr_var->var.v_array;
array_assist_t array_a;
if (SECUREC_UNLIKELY(varray->type == OG_TYPE_UNKNOWN)) {
varray->type = value->type;
if (!cm_datatype_arrayable(value->type)) {
OG_SRC_THROW_ERROR(ogsql_stmt->aggr_node->argument->root->loc, ERR_INVALID_ARG_TYPE);
return OG_ERROR;
}
}
OG_RETURN_IFERR(sql_convert_variant(ogsql_stmt->stmt, value, varray->type));
if (varray->value.type == OG_LOB_FROM_KERNEL) {
vm_lob_t vlob;
vlob.node_id = 0;
vlob.unused = 0;
OG_RETURN_IFERR(sql_get_array_from_knl_lob(ogsql_stmt->stmt, (knl_handle_t)(varray->value.knl_lob.bytes),
&vlob));
varray->value.vm_lob = vlob;
varray->value.type = OG_LOB_FROM_VMPOOL;
}
varray->count++;
ARRAY_INIT_ASSIST_INFO(&array_a, ogsql_stmt->stmt);
OG_RETURN_IFERR(sql_exec_array_element(ogsql_stmt->stmt, &array_a, varray->count, value, OG_TRUE,
&varray->value.vm_lob));
return array_update_head_datatype(&array_a, &varray->value.vm_lob, value->type);
}
static status_t sql_aggr_distinct_value(aggr_assist_t *ass, variant_t *value, bool32 *var_exist)
{
char *buf = NULL;
row_assist_t ra;
hash_segment_t *hash_segment = NULL;
hash_table_entry_t *hash_table_entry = NULL;
sql_stmt_t *stmt = ass->stmt;
OGSQL_SAVE_STACK(stmt);
sql_keep_stack_variant(stmt, value);
if (sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&buf) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
return OG_ERROR;
}
row_init(&ra, buf, OG_MAX_ROW_SIZE, 1);
if (sql_put_row_value(stmt, NULL, &ra, value->type, value) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
return OG_ERROR;
}
so we won't guard the case that varea_read() returned a NULL
we'd rather it cored if that really has happened.
*/
hash_segment = (hash_segment_t *)ass->cursor->exec_data.aggr_dis;
hash_table_entry = (hash_table_entry_t *)((char *)hash_segment + sizeof(hash_segment_t));
if (vm_hash_table_insert2(var_exist, hash_segment, &hash_table_entry[ass->aggr_node->dis_info.idx], buf,
ra.head->size) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
return OG_ERROR;
}
OGSQL_RESTORE_STACK(stmt);
return OG_SUCCESS;
}
status_t sql_aggr_value(aggr_assist_t *aggr_ass, uint32 aggr_idx, variant_t *value)
{
aggr_var_t *aggr_var = NULL;
if (value->is_null && value->type != OG_TYPE_ARRAY) {
if (aggr_ass->aggr_node->nullaware &&
(aggr_ass->aggr_type == AGGR_TYPE_MIN || aggr_ass->aggr_type == AGGR_TYPE_MAX)) {
aggr_var = sql_get_aggr_addr(aggr_ass->cursor, aggr_idx);
aggr_var->var.is_null = OG_TRUE;
aggr_ass->cursor->eof = OG_TRUE;
}
if (aggr_ass->aggr_type != AGGR_TYPE_ARRAY_AGG) {
return OG_SUCCESS;
}
}
if (aggr_ass->aggr_node->dis_info.need_distinct) {
bool32 var_exist = OG_FALSE;
OG_RETURN_IFERR(sql_aggr_distinct_value(aggr_ass, value, &var_exist));
if (var_exist) {
return OG_SUCCESS;
}
if (aggr_ass->aggr_type == AGGR_TYPE_COUNT) {
value->type = OG_TYPE_BIGINT;
value->v_bigint = 1;
}
}
aggr_var = sql_get_aggr_addr(aggr_ass->cursor, aggr_idx);
if (aggr_ass->aggr_type == AGGR_TYPE_SUM &&
SECUREC_LIKELY(!aggr_var->var.is_null) && VAR_IS_NUMBERIC_ZERO(value)) {
return OG_SUCCESS;
}
return sql_get_aggr_func(aggr_ass->aggr_type)->invoke(aggr_ass, aggr_var, value);
}
static inline status_t sql_aggr_calc_none(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var)
{
return OG_SUCCESS;
}
static inline status_t sql_aggr_calc_avg(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var)
{
variant_t v_rows;
v_rows.type = OG_TYPE_BIGINT;
v_rows.is_null = OG_FALSE;
v_rows.v_bigint = (int64)GET_AGGR_VAR_AVG(aggr_var)->ex_avg_count;
return opr_exec(OPER_TYPE_DIV, SESSION_NLS(ogsql_stmt->stmt), &aggr_var->var, &v_rows, &aggr_var->var);
}
static inline status_t sql_aggr_calc_cume_dist(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var)
{
variant_t v_rows;
v_rows.type = OG_TYPE_BIGINT;
v_rows.is_null = OG_FALSE;
v_rows.v_bigint = (int64)GET_AGGR_VAR_AVG(aggr_var)->ex_avg_count + 1;
OG_RETURN_IFERR(var_as_bigint(&aggr_var->var));
aggr_var->var.v_bigint += 1;
return opr_exec(OPER_TYPE_DIV, SESSION_NLS(ogsql_stmt->stmt), &aggr_var->var, &v_rows, &aggr_var->var);
}
static inline status_t sql_aggr_calc_listagg(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var)
{
if (ogsql_stmt->aggr_node->sort_items != NULL) {
return sql_hash_group_calc_listagg(ogsql_stmt->stmt, ogsql_stmt->cursor, ogsql_stmt->aggr_node, aggr_var,
&ogsql_stmt->cursor->exec_data.sort_concat);
}
return OG_SUCCESS;
}
static inline status_t sql_aggr_calc_median(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var)
{
return sql_hash_group_calc_median(ogsql_stmt->stmt, ogsql_stmt->cursor, ogsql_stmt->aggr_node, aggr_var);
}
static status_t sql_aggr_calc_stddev(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var)
{
aggr_stddev_t *aggr_stddev = GET_AGGR_VAR_STDDEV(aggr_var);
if (aggr_stddev->ex_count == 0) {
aggr_var->var.is_null = OG_TRUE;
return OG_SUCCESS;
}
if (aggr_stddev->ex_count == 1) {
switch (ogsql_stmt->aggr_type) {
case AGGR_TYPE_STDDEV:
case AGGR_TYPE_STDDEV_POP:
case AGGR_TYPE_VARIANCE:
case AGGR_TYPE_VAR_POP:
aggr_var->var.v_bigint = 0;
aggr_var->var.type = OG_TYPE_BIGINT;
break;
case AGGR_TYPE_STDDEV_SAMP:
case AGGR_TYPE_VAR_SAMP:
default:
aggr_var->var.is_null = OG_TRUE;
break;
}
return OG_SUCCESS;
}
variant_t v_rows;
variant_t v_rows_m;
variant_t tmp_result;
tmp_result.type = OG_TYPE_NUMBER;
tmp_result.is_null = OG_FALSE;
v_rows.type = OG_TYPE_BIGINT;
v_rows.is_null = OG_FALSE;
v_rows.v_bigint = (int64)aggr_stddev->ex_count;
v_rows_m.type = OG_TYPE_BIGINT;
v_rows_m.is_null = OG_FALSE;
v_rows_m.v_bigint = v_rows.v_bigint - 1;
* STDDEV:
* S = sqrt[ 1/(N-1) * ( sum(Xi^2) - 1/N * sum(Xi)^2 ) ]
*/
(void)opr_exec(OPER_TYPE_MUL, SESSION_NLS(ogsql_stmt->stmt), &aggr_stddev->extra, &aggr_stddev->extra,
&aggr_stddev->extra);
(void)opr_exec(OPER_TYPE_DIV, SESSION_NLS(ogsql_stmt->stmt), &aggr_stddev->extra, &v_rows,
&aggr_stddev->extra);
(void)opr_exec(OPER_TYPE_SUB, SESSION_NLS(ogsql_stmt->stmt), &aggr_var->var, &aggr_stddev->extra,
&aggr_var->var);
switch (ogsql_stmt->aggr_type) {
case AGGR_TYPE_STDDEV:
case AGGR_TYPE_STDDEV_SAMP:
case AGGR_TYPE_VARIANCE:
case AGGR_TYPE_VAR_SAMP:
OG_RETURN_IFERR(opr_exec(OPER_TYPE_DIV, SESSION_NLS(ogsql_stmt->stmt), &aggr_var->var, &v_rows_m,
&tmp_result));
break;
case AGGR_TYPE_STDDEV_POP:
case AGGR_TYPE_VAR_POP:
OG_RETURN_IFERR(opr_exec(OPER_TYPE_DIV, SESSION_NLS(ogsql_stmt->stmt), &aggr_var->var, &v_rows,
&tmp_result));
break;
default:
break;
}
OG_RETURN_IFERR(var_as_decimal(&aggr_var->var));
OG_RETURN_IFERR(var_as_decimal(&tmp_result));
if (IS_DEC8_NEG(&tmp_result.v_dec)) {
aggr_var->var.v_bigint = 0;
aggr_var->var.type = OG_TYPE_BIGINT;
return OG_SUCCESS;
}
if (ogsql_stmt->aggr_type == AGGR_TYPE_VARIANCE || ogsql_stmt->aggr_type == AGGR_TYPE_VAR_POP ||
ogsql_stmt->aggr_type == AGGR_TYPE_VAR_SAMP) {
aggr_var->var.v_dec = tmp_result.v_dec;
return OG_SUCCESS;
}
return cm_dec_sqrt(&tmp_result.v_dec, &aggr_var->var.v_dec);
}
static status_t sql_aggr_calc_corr(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var)
{
sql_stmt_t *stmt = ogsql_stmt->stmt;
aggr_corr_t *aggr_corr = GET_AGGR_VAR_CORR(aggr_var);
if (aggr_corr->ex_count == 0 || aggr_corr->ex_count == 1) {
aggr_var->var.is_null = OG_TRUE;
return OG_SUCCESS;
}
variant_t v_rows;
variant_t tmp_result;
tmp_result.type = OG_TYPE_NUMBER;
tmp_result.is_null = OG_FALSE;
variant_t tmp_result_1;
tmp_result_1.type = OG_TYPE_NUMBER;
tmp_result_1.is_null = OG_FALSE;
variant_t tmp_result_2;
tmp_result_2.type = OG_TYPE_NUMBER;
tmp_result_2.is_null = OG_FALSE;
v_rows.type = OG_TYPE_BIGINT;
v_rows.is_null = OG_FALSE;
v_rows.v_bigint = (int64)aggr_corr->ex_count;
OG_RETURN_IFERR(opr_exec(OPER_TYPE_MUL, SESSION_NLS(stmt), &aggr_corr->extra[CORR_VAR_SUM_X],
&aggr_corr->extra[CORR_VAR_SUM_Y], &tmp_result));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_DIV, SESSION_NLS(stmt), &tmp_result, &v_rows,
&tmp_result));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_SUB, SESSION_NLS(stmt), &aggr_var->var, &tmp_result,
&aggr_var->var));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_DIV, SESSION_NLS(stmt), &aggr_var->var, &v_rows, &tmp_result));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_MUL, SESSION_NLS(stmt), &aggr_corr->extra[CORR_VAR_SUM_X],
&aggr_corr->extra[CORR_VAR_SUM_X], &tmp_result_1));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_DIV, SESSION_NLS(stmt), &tmp_result_1, &v_rows,
&tmp_result_1));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_SUB, SESSION_NLS(stmt), &aggr_corr->extra[CORR_VAR_SUM_XX], &tmp_result_1,
&aggr_var->var));
if (IS_DEC8_NEG(&aggr_var->var.v_dec)) {
aggr_var->var.is_null = OG_TRUE;
return OG_SUCCESS;
}
OG_RETURN_IFERR(opr_exec(OPER_TYPE_DIV, SESSION_NLS(stmt), &aggr_var->var, &v_rows, &aggr_var->var));
OG_RETURN_IFERR(cm_dec_sqrt(&aggr_var->var.v_dec, &tmp_result_1.v_dec));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_MUL, SESSION_NLS(stmt), &aggr_corr->extra[CORR_VAR_SUM_Y],
&aggr_corr->extra[CORR_VAR_SUM_Y], &tmp_result_2));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_DIV, SESSION_NLS(stmt), &tmp_result_2, &v_rows,
&tmp_result_2));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_SUB, SESSION_NLS(stmt), &aggr_corr->extra[CORR_VAR_SUM_YY], &tmp_result_2,
&aggr_var->var));
if (IS_DEC8_NEG(&aggr_var->var.v_dec)) {
aggr_var->var.is_null = OG_TRUE;
return OG_SUCCESS;
}
OG_RETURN_IFERR(opr_exec(OPER_TYPE_DIV, SESSION_NLS(stmt), &aggr_var->var, &v_rows, &aggr_var->var));
OG_RETURN_IFERR(cm_dec_sqrt(&aggr_var->var.v_dec, &tmp_result_2.v_dec));
if (DECIMAL8_IS_ZERO(&tmp_result_1.v_dec) || DECIMAL8_IS_ZERO(&tmp_result_2.v_dec)) {
aggr_var->var.is_null = OG_TRUE;
return OG_SUCCESS;
}
OG_RETURN_IFERR(opr_exec(OPER_TYPE_MUL, SESSION_NLS(stmt), &tmp_result_1, &tmp_result_2,
&tmp_result_2));
OG_RETURN_IFERR(opr_exec(OPER_TYPE_DIV, SESSION_NLS(stmt), &tmp_result, &tmp_result_2,
&tmp_result));
OG_RETURN_IFERR(var_as_decimal(&aggr_var->var));
OG_RETURN_IFERR(var_as_decimal(&tmp_result));
aggr_var->var.v_dec = tmp_result.v_dec;
return OG_SUCCESS;
}
static status_t sql_aggr_calc_covar(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var)
{
sql_stmt_t *stmt = ogsql_stmt->stmt;
aggr_covar_t *aggr_covar = GET_AGGR_VAR_COVAR(aggr_var);
if (aggr_covar->ex_count == 0) {
aggr_var->var.is_null = OG_TRUE;
return OG_SUCCESS;
}
if (aggr_covar->ex_count == 1) {
switch (ogsql_stmt->aggr_type) {
case AGGR_TYPE_COVAR_POP:
aggr_var->var.v_bigint = 0;
aggr_var->var.type = OG_TYPE_BIGINT;
return OG_SUCCESS;
case AGGR_TYPE_COVAR_SAMP:
default:
aggr_var->var.is_null = OG_TRUE;
return OG_SUCCESS;
}
}
variant_t v_rows, v_rows_m, tmp_result;
tmp_result.type = OG_TYPE_NUMBER;
tmp_result.is_null = OG_FALSE;
v_rows.type = OG_TYPE_BIGINT;
v_rows.is_null = OG_FALSE;
v_rows.v_bigint = (int64)aggr_covar->ex_count;
v_rows_m.type = OG_TYPE_BIGINT;
v_rows_m.is_null = OG_FALSE;
v_rows_m.v_bigint = v_rows.v_bigint - 1;
* COVAR_POP:
* S = 1/N * ( sum(Xi*Yi) - 1/N * sum(Xi)*sun(Yi) )
* COVAR_SAMP:
* S = 1/(N-1) * ( sum(Xi*Yi) - 1/N * sum(Xi)*sun(Yi) )
*/
(void)opr_exec(OPER_TYPE_MUL, SESSION_NLS(stmt), &aggr_covar->extra, &aggr_covar->extra_1,
&aggr_covar->extra);
(void)opr_exec(OPER_TYPE_DIV, SESSION_NLS(stmt), &aggr_covar->extra, &v_rows,
&aggr_covar->extra);
(void)opr_exec(OPER_TYPE_SUB, SESSION_NLS(stmt), &aggr_var->var, &aggr_covar->extra,
&aggr_var->var);
switch (ogsql_stmt->aggr_type) {
case AGGR_TYPE_COVAR_SAMP:
OG_RETURN_IFERR(opr_exec(OPER_TYPE_DIV, SESSION_NLS(stmt), &aggr_var->var, &v_rows_m, &tmp_result));
break;
case AGGR_TYPE_COVAR_POP:
OG_RETURN_IFERR(opr_exec(OPER_TYPE_DIV, SESSION_NLS(stmt), &aggr_var->var, &v_rows, &tmp_result));
break;
default:
break;
}
aggr_var->var.v_dec = tmp_result.v_dec;
return OG_SUCCESS;
}
status_t sql_exec_aggr(sql_stmt_t *stmt, sql_cursor_t *cursor, galist_t *aggrs, plan_node_t *plan)
{
const sql_func_t *func = NULL;
aggr_var_t *aggr_var = NULL;
aggr_assist_t ogsql_stmt;
SQL_INIT_AGGR_ASSIST(&ogsql_stmt, stmt, cursor);
cursor->exec_data.sort_concat.len = 0;
for (uint32 i = 0; i < aggrs->count; i++) {
ogsql_stmt.aggr_node = (expr_node_t *)cm_galist_get(aggrs, i);
func = GET_AGGR_FUNC(ogsql_stmt.aggr_node);
if (func->aggr_type == AGGR_TYPE_SUM || func->aggr_type == AGGR_TYPE_COUNT ||
func->aggr_type == AGGR_TYPE_MIN || func->aggr_type == AGGR_TYPE_MAX ||
func->aggr_type == AGGR_TYPE_ARRAY_AGG || func->aggr_type == AGGR_TYPE_RANK) {
continue;
}
aggr_var = sql_get_aggr_addr(cursor, i);
if (aggr_var->var.is_null) {
continue;
}
ogsql_stmt.aggr_type = func->aggr_type;
OG_RETURN_IFERR(sql_aggr_calc_value(&ogsql_stmt, aggr_var));
}
return OG_SUCCESS;
}
status_t sql_exec_aggr_extra(sql_stmt_t *stmt, sql_cursor_t *cursor, galist_t *aggrs, plan_node_t *plan)
{
uint32 i;
expr_node_t *aggr_node = NULL;
const sql_func_t *func = NULL;
aggr_var_t *aggr_var = NULL;
aggr_dense_rank_t *aggr_dense_rank = NULL;
for (i = 0; i < aggrs->count; i++) {
aggr_node = (expr_node_t *)cm_galist_get(aggrs, i);
func = GET_AGGR_FUNC(aggr_node);
sql_aggr_type_t func_type = func->aggr_type;
aggr_var = sql_get_aggr_addr(cursor, i);
switch (func_type) {
case AGGR_TYPE_DENSE_RANK:
if (!aggr_var->var.is_null) {
aggr_dense_rank = GET_AGGR_VAR_DENSE_RANK(aggr_var);
vm_hash_segment_deinit(&aggr_dense_rank->hash_segment);
aggr_dense_rank->table_entry.vmid = OG_INVALID_ID32;
aggr_dense_rank->table_entry.offset = OG_INVALID_ID32;
}
case AGGR_TYPE_RANK:
case AGGR_TYPE_CUME_DIST:
aggr_var->var.is_null = OG_FALSE;
break;
default:
break;
}
}
return OG_SUCCESS;
}
status_t sql_aggr_get_cntdis_value(sql_stmt_t *stmt, galist_t *cntdis_columns, expr_node_t *aggr_node, variant_t *value)
{
expr_node_t *cntdis_column = NULL;
uint32 group_id = aggr_node->dis_info.group_id;
if (value->v_bigint == 0 || group_id == OG_INVALID_ID32) {
value->is_null = OG_TRUE;
return OG_SUCCESS;
}
cntdis_column = (expr_node_t *)cm_galist_get(cntdis_columns, group_id);
OG_RETURN_IFERR(sql_exec_expr_node(stmt, cntdis_column, value));
if (!value->is_null && OG_IS_LOB_TYPE(value->type)) {
OG_SRC_THROW_ERROR(aggr_node->argument->loc, ERR_SQL_SYNTAX_ERROR, "unexpected lob column occurs");
return OG_ERROR;
}
return OG_SUCCESS;
}
static inline bool32 sql_aggr_is_nullaware(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
expr_node_t *aggr_node = NULL;
sql_func_t *func = NULL;
if (plan->aggr.items->count == 1) {
aggr_node = (expr_node_t *)cm_galist_get(plan->aggr.items, 0);
func = sql_get_func(&aggr_node->value.v_func);
if (aggr_node->nullaware && (func->aggr_type == AGGR_TYPE_MIN || func->aggr_type == AGGR_TYPE_MAX)) {
return OG_TRUE;
}
}
return OG_FALSE;
}
static inline bool32 sql_aggr_cond_is_empty(cond_tree_t *cond)
{
return (bool32)(cond == NULL || cond->root == NULL || cond->root->type == COND_NODE_TRUE);
}
static bool32 sql_aggr_is_count_star(expr_node_t *aggr_node)
{
const sql_func_t *func = GET_AGGR_FUNC(aggr_node);
return (bool32)(func->aggr_type == AGGR_TYPE_COUNT &&
!aggr_node->dis_info.need_distinct &&
aggr_node->argument != NULL &&
aggr_node->argument->next == NULL &&
aggr_node->argument->root != NULL &&
aggr_node->argument->root->type == EXPR_NODE_STAR);
}
static bool32 sql_aggr_cond_has_external_ref(cond_node_t *cond, uint32 table_id)
{
if (cond == NULL) {
return OG_FALSE;
}
cols_used_t cols_used;
init_cols_used(&cols_used);
sql_collect_cols_in_cond(cond, &cols_used);
if (HAS_DYNAMIC_SUBSLCT(&cols_used) || HAS_ROWNUM(&cols_used) || HAS_PRIOR(&cols_used) ||
HAS_PRNT_OR_ANCSTR_COLS(cols_used.flags)) {
return OG_TRUE;
}
biqueue_t *cols_que = &cols_used.cols_que[SELF_IDX];
biqueue_node_t *curr_node = biqueue_first(cols_que);
biqueue_node_t *end_node = biqueue_end(cols_que);
while (curr_node != end_node) {
expr_node_t *col = OBJECT_OF(expr_node_t, curr_node);
if (TAB_OF_NODE(col) != table_id) {
return OG_TRUE;
}
curr_node = curr_node->next;
}
return OG_FALSE;
}
static bool32 sql_aggr_has_param_scan(plan_node_t *plan)
{
if (plan == NULL) {
return OG_FALSE;
}
switch (plan->type) {
case PLAN_NODE_SCAN: {
sql_table_t *table = plan->scan_p.table;
* Parameterized rowid/index scans depend on a row produced outside
* this subtree. They may leave the NL join node's cond/filter empty,
* so they must not be counted by multiplying independent child rows.
*/
return (bool32)(table != NULL &&
(table->scan_mode == SCAN_MODE_ROWID ||
(table->index != NULL && (table->col_use_flag & USE_SELF_JOIN_COL)) ||
sql_aggr_cond_has_external_ref(table->cond == NULL ? NULL : table->cond->root, table->id)));
}
case PLAN_NODE_JOIN:
return (bool32)(sql_aggr_has_param_scan(plan->join_p.left) ||
sql_aggr_has_param_scan(plan->join_p.right));
case PLAN_NODE_CONCATE:
if (plan->cnct_p.plans == NULL) {
return OG_FALSE;
}
for (uint32 i = 0; i < plan->cnct_p.plans->count; i++) {
if (sql_aggr_has_param_scan((plan_node_t *)cm_galist_get(plan->cnct_p.plans, i))) {
return OG_TRUE;
}
}
return OG_FALSE;
default:
return OG_FALSE;
}
}
static bool32 sql_aggr_plan_has_table(plan_node_t *plan, uint32 table_id)
{
if (plan == NULL) {
return OG_FALSE;
}
switch (plan->type) {
case PLAN_NODE_SCAN:
return (bool32)(plan->scan_p.table != NULL && plan->scan_p.table->id == table_id);
case PLAN_NODE_JOIN:
return (bool32)(sql_aggr_plan_has_table(plan->join_p.left, table_id) ||
sql_aggr_plan_has_table(plan->join_p.right, table_id));
case PLAN_NODE_CONCATE:
if (plan->cnct_p.plans == NULL) {
return OG_FALSE;
}
for (uint32 i = 0; i < plan->cnct_p.plans->count; i++) {
if (sql_aggr_plan_has_table((plan_node_t *)cm_galist_get(plan->cnct_p.plans, i), table_id)) {
return OG_TRUE;
}
}
return OG_FALSE;
default:
return OG_FALSE;
}
}
static bool32 sql_aggr_cols_refs_outside_plan(cols_used_t *cols_used, plan_node_t *plan)
{
if (HAS_DYNAMIC_SUBSLCT(cols_used) || HAS_ROWNUM(cols_used) || HAS_PRIOR(cols_used) ||
HAS_PRNT_OR_ANCSTR_COLS(cols_used->flags)) {
return OG_TRUE;
}
biqueue_t *cols_que = &cols_used->cols_que[SELF_IDX];
biqueue_node_t *curr_node = biqueue_first(cols_que);
biqueue_node_t *end_node = biqueue_end(cols_que);
while (curr_node != end_node) {
expr_node_t *col = OBJECT_OF(expr_node_t, curr_node);
if (!sql_aggr_plan_has_table(plan, TAB_OF_NODE(col))) {
return OG_TRUE;
}
curr_node = curr_node->next;
}
return OG_FALSE;
}
static bool32 sql_aggr_cond_refs_outside_plan(cond_tree_t *cond, plan_node_t *plan)
{
if (sql_aggr_cond_is_empty(cond)) {
return OG_FALSE;
}
cols_used_t cols_used;
init_cols_used(&cols_used);
sql_collect_cols_in_cond(cond->root, &cols_used);
return sql_aggr_cols_refs_outside_plan(&cols_used, plan);
}
static bool32 sql_aggr_expr_refs_outside_plan(expr_tree_t *expr, plan_node_t *plan)
{
if (expr == NULL) {
return OG_FALSE;
}
cols_used_t cols_used;
init_cols_used(&cols_used);
sql_collect_cols_in_expr_tree(expr, &cols_used);
return sql_aggr_cols_refs_outside_plan(&cols_used, plan);
}
static bool32 sql_aggr_scan_refs_outside_plan(sql_table_t *table, plan_node_t *plan)
{
if (table == NULL) {
return OG_FALSE;
}
if (sql_aggr_cond_refs_outside_plan(table->cond, plan)) {
return OG_TRUE;
}
switch (table->type) {
case JSON_TABLE:
return (bool32)(table->json_table_info != NULL &&
sql_aggr_expr_refs_outside_plan(table->json_table_info->data_expr, plan));
case FUNC_AS_TABLE:
return sql_aggr_expr_refs_outside_plan(table->func.args, plan);
default:
return OG_FALSE;
}
}
static bool32 sql_aggr_plan_refs_outside_plan(plan_node_t *node, plan_node_t *plan)
{
if (node == NULL) {
return OG_FALSE;
}
switch (node->type) {
case PLAN_NODE_SCAN:
return sql_aggr_scan_refs_outside_plan(node->scan_p.table, plan);
case PLAN_NODE_JOIN:
return (bool32)(sql_aggr_plan_refs_outside_plan(node->join_p.left, plan) ||
sql_aggr_plan_refs_outside_plan(node->join_p.right, plan));
case PLAN_NODE_CONCATE:
if (node->cnct_p.plans == NULL) {
return OG_FALSE;
}
for (uint32 i = 0; i < node->cnct_p.plans->count; i++) {
if (sql_aggr_plan_refs_outside_plan((plan_node_t *)cm_galist_get(node->cnct_p.plans, i), plan)) {
return OG_TRUE;
}
}
return OG_FALSE;
default:
return OG_FALSE;
}
}
static bool32 sql_aggr_plan_can_fetch_independently(sql_cursor_t *cursor, plan_node_t *plan)
{
if (cursor == NULL || cursor->query == NULL) {
return (bool32)(!sql_aggr_plan_refs_outside_plan(plan, plan));
}
return (bool32)(!sql_aggr_cond_refs_outside_plan(cursor->query->cond, plan) &&
!sql_aggr_plan_refs_outside_plan(plan, plan));
}
static bool32 sql_aggr_can_fast_count_cartesian(plan_node_t *plan)
{
if (plan == NULL || plan->type != PLAN_NODE_JOIN) {
return OG_FALSE;
}
if (plan->join_p.oper != JOIN_OPER_NL) {
return OG_FALSE;
}
if (!sql_aggr_cond_is_empty(plan->join_p.cond) || !sql_aggr_cond_is_empty(plan->join_p.filter)) {
return OG_FALSE;
}
return (bool32)(!sql_aggr_has_param_scan(plan->join_p.right));
}
static status_t sql_aggr_try_count_plan_rows(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, uint64 *rows,
bool32 *counted);
static status_t sql_aggr_count_plan_rows_by_fetch(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan,
uint64 *rows)
{
sql_cursor_t *count_cursor = NULL;
bool32 eof = OG_FALSE;
status_t status = OG_SUCCESS;
OG_RETURN_IFERR(sql_alloc_cursor(stmt, &count_cursor));
count_cursor->scn = cursor->scn;
count_cursor->ancestor_ref = cursor->ancestor_ref;
if (sql_open_query_cursor(stmt, count_cursor, cursor->query) != OG_SUCCESS) {
sql_free_cursor(stmt, count_cursor);
return OG_ERROR;
}
if (SQL_CURSOR_PUSH(stmt, count_cursor) != OG_SUCCESS) {
sql_free_cursor(stmt, count_cursor);
return OG_ERROR;
}
status = sql_execute_query_plan(stmt, count_cursor, plan);
while (status == OG_SUCCESS) {
OGSQL_SAVE_STACK(stmt);
status = sql_fetch_query(stmt, count_cursor, plan, &eof);
OGSQL_RESTORE_STACK(stmt);
if (status != OG_SUCCESS || eof) {
break;
}
(*rows)++;
}
if (sql_free_query_mtrl(stmt, count_cursor, plan) != OG_SUCCESS && status == OG_SUCCESS) {
status = OG_ERROR;
}
SQL_CURSOR_POP(stmt);
sql_free_cursor(stmt, count_cursor);
return status;
}
static status_t sql_aggr_try_count_plan_rows(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, uint64 *rows,
bool32 *counted)
{
*counted = OG_FALSE;
if (!sql_aggr_can_fast_count_cartesian(plan)) {
if (!sql_aggr_plan_can_fetch_independently(cursor, plan)) {
return OG_SUCCESS;
}
*rows = 0;
OG_RETURN_IFERR(sql_aggr_count_plan_rows_by_fetch(stmt, cursor, plan, rows));
*counted = OG_TRUE;
return OG_SUCCESS;
}
uint64 left_rows = 0;
uint64 right_rows = 0;
bool32 left_counted = OG_FALSE;
bool32 right_counted = OG_FALSE;
OG_RETURN_IFERR(sql_aggr_try_count_plan_rows(stmt, cursor, plan->join_p.left, &left_rows, &left_counted));
if (!left_counted) {
return OG_SUCCESS;
}
OG_RETURN_IFERR(sql_aggr_try_count_plan_rows(stmt, cursor, plan->join_p.right, &right_rows, &right_counted));
if (!right_counted) {
return OG_SUCCESS;
}
if (left_rows != 0 && right_rows > (uint64)OG_MAX_INT64 / left_rows) {
OG_THROW_ERROR(ERR_TYPE_OVERFLOW, "BIGINT");
return OG_ERROR;
}
*rows = left_rows * right_rows;
*counted = OG_TRUE;
return OG_SUCCESS;
}
static inline bool32 sql_aggr_may_try_fast_count_cartesian(plan_node_t *plan, bool32 par_exe_flag, uint32 aggr_cnt)
{
if (par_exe_flag || aggr_cnt != 1 || plan == NULL || plan->aggr.next == NULL ||
plan->aggr.next->type != PLAN_NODE_JOIN) {
return OG_FALSE;
}
return (bool32)(plan->aggr.next->join_p.oper == JOIN_OPER_NL);
}
static status_t sql_aggr_try_fast_count_cartesian(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan,
bool32 par_exe_flag, bool32 *optimized)
{
expr_node_t *aggr_node = NULL;
aggr_var_t *aggr_var = NULL;
uint64 rows = 0;
bool32 counted = OG_FALSE;
*optimized = OG_FALSE;
if (par_exe_flag || plan->aggr.items->count != 1 || !sql_aggr_can_fast_count_cartesian(plan->aggr.next)) {
return OG_SUCCESS;
}
aggr_node = (expr_node_t *)cm_galist_get(plan->aggr.items, 0);
if (!sql_aggr_is_count_star(aggr_node)) {
return OG_SUCCESS;
}
* COUNT(*) over an unqualified Cartesian nested loop only needs the child
* cardinalities. Counting child rows exactly avoids materializing every
* product row before the aggregate consumes it.
*/
OG_RETURN_IFERR(sql_aggr_try_count_plan_rows(stmt, cursor, plan->aggr.next, &rows, &counted));
if (!counted) {
return OG_SUCCESS;
}
aggr_var = sql_get_aggr_addr(cursor, 0);
aggr_var->var.type = OG_TYPE_BIGINT;
aggr_var->var.is_null = OG_FALSE;
aggr_var->var.v_bigint = (int64)rows;
*optimized = OG_TRUE;
return sql_exec_aggr_extra(stmt, cursor, plan->aggr.items, plan);
}
status_t sql_mtrl_aggr(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, bool32 par_exe_flag)
{
uint32 i;
uint32 avgs;
uint32 rows;
bool32 optimized = OG_FALSE;
variant_t *value = NULL;
const sql_func_t *func = NULL;
uint32 aggr_cnt = plan->aggr.items->count;
aggr_assist_t ogsql_stmt;
SQL_INIT_AGGR_ASSIST(&ogsql_stmt, stmt, cursor);
cursor->mtrl.cursor.type = MTRL_CURSOR_OTHERS;
OG_RETURN_IFERR(sql_init_aggr_page(stmt, cursor, plan->aggr.items));
OG_RETURN_IFERR(sql_init_aggr_values(stmt, cursor, plan->aggr.next, plan->aggr.items, &avgs));
if (sql_aggr_may_try_fast_count_cartesian(plan, par_exe_flag, aggr_cnt)) {
* Keep ordinary aggregates and outer joins on the original hot path.
* The fast-count probe is only useful when the top child can be a pure
* Cartesian nested loop.
*/
OG_RETURN_IFERR(sql_aggr_try_fast_count_cartesian(stmt, cursor, plan, par_exe_flag, &optimized));
if (optimized) {
return OG_SUCCESS;
}
}
OG_RETURN_IFERR(sql_execute_query_plan(stmt, cursor, plan->aggr.next));
if (cursor->eof) {
if (sql_aggr_is_nullaware(stmt, cursor, plan)) {
cursor->mtrl.aggr_fetched = OG_TRUE;
}
return OG_SUCCESS;
}
uint32 alloc_size = sizeof(variant_t) * (FO_VAL_MAX - 1) + sizeof(expr_node_t *) * aggr_cnt;
OG_RETURN_IFERR(sql_push(stmt, alloc_size, (void **)&value));
expr_node_t **aggr_nodes = (expr_node_t **)((char *)value + sizeof(variant_t) * (FO_VAL_MAX - 1));
for (i = 0; i < aggr_cnt; i++) {
aggr_nodes[i] = (expr_node_t *)cm_galist_get(plan->aggr.items, i);
}
OGSQL_SAVE_STACK(stmt);
OG_RETURN_IFERR(sql_fetch_query(stmt, cursor, plan->aggr.next, &cursor->eof));
rows = 0;
while (!cursor->eof) {
rows++;
for (i = 0; i < aggr_cnt; i++) {
ogsql_stmt.avg_count = 1;
ogsql_stmt.aggr_node = aggr_nodes[i];
func = GET_AGGR_FUNC(ogsql_stmt.aggr_node);
OG_RETURN_IFERR(sql_exec_expr_node(stmt, AGGR_VALUE_NODE(func, ogsql_stmt.aggr_node), value));
if (OG_IS_LOB_TYPE(value->type)) {
OG_RETURN_IFERR(sql_get_lob_value(stmt, value));
}
if (ogsql_stmt.aggr_node->dis_info.need_distinct && func->aggr_type == AGGR_TYPE_COUNT) {
OG_RETURN_IFERR(sql_aggr_get_cntdis_value(stmt, plan->aggr.cntdis_columns, ogsql_stmt.aggr_node,
value));
}
ogsql_stmt.aggr_type = func->aggr_type;
OG_RETURN_IFERR(sql_aggr_value(&ogsql_stmt, i, value));
}
OGSQL_RESTORE_STACK(stmt);
OG_RETURN_IFERR(sql_fetch_query(stmt, cursor, plan->aggr.next, &cursor->eof));
}
OGSQL_RESTORE_STACK(stmt);
OGSQL_POP(stmt);
OG_RETURN_IFERR(sql_free_query_mtrl(stmt, cursor, plan->aggr.next));
if ((avgs > 0 && rows > 0) && (par_exe_flag == OG_FALSE)) {
OG_RETURN_IFERR(sql_exec_aggr(stmt, cursor, plan->aggr.items, plan));
}
if (rows == 0 && sql_aggr_is_nullaware(stmt, cursor, plan)) {
cursor->mtrl.aggr_fetched = OG_TRUE;
}
return sql_exec_aggr_extra(stmt, cursor, plan->aggr.items, plan);
}
static status_t sql_mtrl_index_aggr(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
uint32 avgs;
uint32 rows = 0;
bool32 result = OG_FALSE;
variant_t *value = NULL;
const sql_func_t *func = NULL;
aggr_assist_t ass;
SQL_INIT_AGGR_ASSIST(&ass, stmt, cursor);
if (plan->aggr.next->type != PLAN_NODE_SCAN) {
return OG_ERROR;
}
cursor->mtrl.cursor.type = MTRL_CURSOR_OTHERS;
OG_RETURN_IFERR(sql_init_aggr_page(stmt, cursor, plan->aggr.items));
OG_RETURN_IFERR(sql_init_aggr_values(stmt, cursor, plan->aggr.next, plan->aggr.items, &avgs));
OG_RETURN_IFERR(sql_execute_scan(stmt, cursor, plan->aggr.next));
sql_table_t *table = plan->aggr.next->scan_p.table;
sql_table_cursor_t *tab_cur = &cursor->tables[table->id];
for (;;) {
OG_RETURN_IFERR(sql_fetch_one_part(stmt, tab_cur, table));
if (!tab_cur->knl_cur->eof) {
rows++;
ass.aggr_node = (expr_node_t *)cm_galist_get(plan->aggr.items, 0);
func = GET_AGGR_FUNC(ass.aggr_node);
OG_RETURN_IFERR(sql_push(stmt, func->value_cnt * sizeof(variant_t), (void **)&value));
if (sql_exec_expr_node(stmt, AGGR_VALUE_NODE(func, ass.aggr_node), value) != OG_SUCCESS) {
OGSQL_POP(stmt);
return OG_ERROR;
}
ass.aggr_type = func->aggr_type;
ass.avg_count = 0;
if (sql_aggr_value(&ass, (uint32)0, value) != OG_SUCCESS) {
OGSQL_POP(stmt);
return OG_ERROR;
}
OGSQL_POP(stmt);
}
OG_RETURN_IFERR(sql_try_switch_part(stmt, tab_cur, table, &result));
if (!result) {
break;
}
}
if (rows == 0 && sql_aggr_is_nullaware(stmt, cursor, plan)) {
cursor->mtrl.aggr_fetched = OG_TRUE;
}
return OG_SUCCESS;
}
void clean_mtrl_seg(sql_stmt_t *stmt, sql_cursor_t *cursor)
{
mtrl_close_segment(&stmt->mtrl, cursor->mtrl.aggr);
mtrl_release_segment(&stmt->mtrl, cursor->mtrl.aggr);
cursor->mtrl.aggr = OG_INVALID_ID32;
}
status_t sql_execute_aggr(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
#ifdef TIME_STATISTIC
clock_t start;
double timeuse;
start = cm_cal_time_bengin();
#endif
OG_RETURN_IFERR(mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_AGGR, NULL, &cursor->mtrl.aggr));
OG_RETURN_IFERR(mtrl_open_segment(&stmt->mtrl, cursor->mtrl.aggr));
OG_RETURN_IFERR(SQL_CURSOR_PUSH(stmt, cursor));
if (sql_mtrl_aggr(stmt, cursor, plan, cursor->par_ctx.par_exe_flag) != OG_SUCCESS) {
SQL_CURSOR_POP(stmt);
clean_mtrl_seg(stmt, cursor);
return OG_ERROR;
}
SQL_CURSOR_POP(stmt);
cursor->eof = OG_FALSE;
#ifdef TIME_STATISTIC
timeuse = cm_cal_time_end(start);
stmt->mt_time += timeuse;
#endif
return OG_SUCCESS;
}
status_t sql_execute_index_aggr(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
if (mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_AGGR, NULL, &cursor->mtrl.aggr) != OG_SUCCESS) {
return OG_ERROR;
}
if (mtrl_open_segment(&stmt->mtrl, cursor->mtrl.aggr) != OG_SUCCESS) {
return OG_ERROR;
}
OG_RETURN_IFERR(SQL_CURSOR_PUSH(stmt, cursor));
if (sql_mtrl_index_aggr(stmt, cursor, plan) != OG_SUCCESS) {
SQL_CURSOR_POP(stmt);
return OG_ERROR;
}
SQL_CURSOR_POP(stmt);
cursor->eof = OG_FALSE;
return OG_SUCCESS;
}
static inline status_t sql_aggr_reset_default(aggr_var_t *aggr_var)
{
if (!OG_IS_VARLEN_TYPE(aggr_var->var.type)) {
MEMS_RETURN_IFERR(memset_s(&aggr_var->var, sizeof(variant_t), 0, sizeof(variant_t)));
} else {
aggr_var->var.type = 0;
aggr_var->var.is_null = OG_FALSE;
aggr_var->var.v_text.len = 0;
aggr_var->var.v_bin.size = 0;
}
return OG_SUCCESS;
}
static inline status_t sql_aggr_reset_median(aggr_var_t *aggr_var)
{
aggr_var->var.is_null = OG_TRUE;
GET_AGGR_VAR_MEDIAN(aggr_var)->median_count = 0;
GET_AGGR_VAR_MEDIAN(aggr_var)->sort_rid.vmid = OG_INVALID_ID32;
GET_AGGR_VAR_MEDIAN(aggr_var)->sort_rid.slot = OG_INVALID_ID32;
return sql_aggr_reset_default(aggr_var);
}
static inline status_t sql_aggr_reset_avg(aggr_var_t *aggr_var)
{
aggr_var->var.v_bigint = 0;
GET_AGGR_VAR_AVG(aggr_var)->ex_avg_count = 0;
return sql_aggr_reset_default(aggr_var);
}
static inline status_t sql_aggr_reset_rank(aggr_var_t *aggr_var)
{
aggr_var->var.is_null = OG_TRUE;
aggr_var->var.type = OG_TYPE_INTEGER;
VALUE(uint32, &aggr_var->var) = 1;
return sql_aggr_reset_default(aggr_var);
}
static inline status_t sql_aggr_reset_min_max(aggr_var_t *aggr_var)
{
if (OG_IS_VARLEN_TYPE(aggr_var->var.type)) {
aggr_str_t *aggr_str = GET_AGGR_VAR_STR(aggr_var);
aggr_str->str_result.vmid = OG_INVALID_ID32;
aggr_str->str_result.slot = OG_INVALID_ID32;
}
return sql_aggr_reset_default(aggr_var);
}
static inline status_t sql_aggr_reset_listagg(aggr_var_t *aggr_var)
{
aggr_group_concat_t *aggr_group = GET_AGGR_VAR_GROUPCONCAT(aggr_var);
aggr_group->total_len = 0;
aggr_group->aggr_str.str_result.vmid = OG_INVALID_ID32;
aggr_group->aggr_str.str_result.slot = OG_INVALID_ID32;
aggr_group->sort_rid.vmid = OG_INVALID_ID32;
aggr_group->sort_rid.slot = OG_INVALID_ID32;
return sql_aggr_reset_default(aggr_var);
}
static inline status_t sql_aggr_reset_array_agg(aggr_var_t *aggr_var)
{
aggr_var->var.type = OG_TYPE_ARRAY;
cm_reset_vm_lob(&aggr_var->var.v_array.value.vm_lob);
return sql_aggr_reset_default(aggr_var);
}
static inline status_t sql_aggr_reset_covar(aggr_var_t *aggr_var)
{
aggr_covar_t *aggr_covar = GET_AGGR_VAR_COVAR(aggr_var);
aggr_covar->ex_count = 0;
MEMS_RETURN_IFERR(memset_s(&aggr_covar->extra, sizeof(variant_t), 0, sizeof(variant_t)));
MEMS_RETURN_IFERR(memset_s(&aggr_covar->extra_1, sizeof(variant_t), 0, sizeof(variant_t)));
return sql_aggr_reset_default(aggr_var);
}
static inline status_t sql_aggr_reset_corr(aggr_var_t *aggr_var)
{
aggr_corr_t *corr = GET_AGGR_VAR_CORR(aggr_var);
MEMS_RETURN_IFERR(memset_s(&corr->extra, sizeof(corr->extra), 0, sizeof(corr->extra)));
return sql_aggr_reset_default(aggr_var);
}
static inline status_t sql_aggr_reset_stddev(aggr_var_t *aggr_var)
{
aggr_stddev_t *aggr_stddev = GET_AGGR_VAR_STDDEV(aggr_var);
aggr_stddev->ex_count = 0;
MEMS_RETURN_IFERR(memset_s(&aggr_stddev->extra, sizeof(variant_t), 0, sizeof(variant_t)));
return sql_aggr_reset_default(aggr_var);
}
static inline status_t sql_aggr_init_count(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var)
{
aggr_var->var.type = OG_TYPE_BIGINT;
aggr_var->var.is_null = OG_FALSE;
aggr_var->var.v_bigint = 0;
return OG_SUCCESS;
}
static status_t ogsql_aggr_init_type_buf(vmc_t *vmc, galist_t *sort_items, char **type_buf)
{
if (sort_items == NULL || *type_buf != NULL) {
return OG_SUCCESS;
}
return sql_sort_mtrl_record_types(vmc, MTRL_SEGMENT_CONCAT_SORT, sort_items, type_buf);
}
static inline status_t sql_aggr_init_median(aggr_assist_t *ogsql_stmt, aggr_var_t *aggr_var)
{
aggr_var->var.is_null = OG_TRUE;
aggr_var->var.type = ogsql_stmt->aggr_node->datatype;
GET_AGGR_VAR_MEDIAN(aggr_var)->median_count = 0;
GET_AGGR_VAR_MEDIAN(aggr_var)->sort_rid.vmid = OG_INVALID_ID32;
GET_AGGR_VAR_MEDIAN(aggr_var)->sort_rid.slot = OG_INVALID_ID32;
ogsql_stmt->avg_count++;
return ogsql_aggr_init_type_buf(&ogsql_stmt->cursor->vmc, ogsql_stmt->aggr_node->sort_items,
&(GET_AGGR_VAR_MEDIAN(aggr_var)->type_buf));
}
static status_t sql_aggr_init_rank(aggr_assist_t *aggr_ass, aggr_var_t *aggr_var)
{
aggr_var->var.is_null = OG_TRUE;
aggr_var->var.type = OG_TYPE_INTEGER;
VALUE(uint32, &aggr_var->var) = 1;
return OG_SUCCESS;
}
static inline status_t sql_aggr_init_gc_sort_data(sql_cursor_t *cursor)
{
text_buf_t *sort_concat = &cursor->exec_data.sort_concat;
if (cursor->exec_data.sort_concat.str == NULL) {
OG_RETURN_IFERR(vmc_alloc(&cursor->vmc, OG_MAX_ROW_SIZE, (void **)&sort_concat->str));
sort_concat->max_size = OG_MAX_ROW_SIZE;
sort_concat->len = 0;
}
return OG_SUCCESS;
}
static status_t sql_aggr_init_listagg(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
aggr_group_concat_t *aggr_group = GET_AGGR_VAR_GROUPCONCAT(aggr_var);
aggr_group->sort_rid.slot = OG_INVALID_ID32;
aggr_group->sort_rid.vmid = OG_INVALID_ID32;
aggr_group->total_len = 0;
aggr_group->aggr_str.str_result.vmid = OG_INVALID_ID32;
aggr_group->aggr_str.str_result.slot = OG_INVALID_ID32;
aggr_var->var.type = OG_TYPE_STRING;
aggr_var->var.is_null = OG_TRUE;
aggr_var->var.v_text.len = 0;
if (ass->aggr_node->sort_items != NULL) {
ass->avg_count++;
}
OG_RETURN_IFERR(ogsql_aggr_init_type_buf(&ass->cursor->vmc, ass->aggr_node->sort_items, &aggr_group->type_buf));
return sql_aggr_init_gc_sort_data(ass->cursor);
}
static inline status_t sql_aggr_init_array_agg(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
array_assist_t array_a;
id_list_t *vm_list = sql_get_exec_lob_list(ass->stmt);
aggr_var->var.is_null = OG_FALSE;
aggr_var->var.type = OG_TYPE_ARRAY;
aggr_var->var.v_array.count = 0;
aggr_var->var.v_array.value.type = OG_LOB_FROM_VMPOOL;
aggr_var->var.v_array.type = ass->aggr_node->typmod.datatype;
return array_init(&array_a, KNL_SESSION(ass->stmt), ass->stmt->mtrl.pool, vm_list,
&aggr_var->var.v_array.value.vm_lob);
}
static inline status_t sql_aggr_init_stddev(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
aggr_var->var.is_null = OG_TRUE;
aggr_var->var.type = ass->aggr_node->datatype;
aggr_stddev_t *aggr_stddev = GET_AGGR_VAR_STDDEV(aggr_var);
aggr_stddev->extra.is_null = OG_TRUE;
aggr_stddev->ex_count = 0;
ass->avg_count++;
return OG_SUCCESS;
}
static inline status_t sql_aggr_init_covar(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
aggr_covar_t *aggr_covar = GET_AGGR_VAR_COVAR(aggr_var);
aggr_covar->extra.is_null = OG_TRUE;
aggr_covar->extra_1.is_null = OG_TRUE;
aggr_covar->ex_count = 0;
aggr_var->var.is_null = OG_TRUE;
aggr_var->var.type = OG_TYPE_NUMBER;
ass->avg_count++;
return OG_SUCCESS;
}
static inline status_t sql_aggr_init_corr(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
aggr_var->var.is_null = OG_TRUE;
aggr_var->var.type = ass->aggr_node->datatype;
aggr_corr_t *aggr_corr = GET_AGGR_VAR_CORR(aggr_var);
aggr_corr->extra[CORR_VAR_SUM_X].is_null = OG_TRUE;
aggr_corr->extra[CORR_VAR_SUM_Y].is_null = OG_TRUE;
aggr_corr->extra[CORR_VAR_SUM_XX].is_null = OG_TRUE;
aggr_corr->extra[CORR_VAR_SUM_YY].is_null = OG_TRUE;
aggr_corr->ex_count = 0;
ass->avg_count++;
return OG_SUCCESS;
}
static inline status_t sql_aggr_init_cume_dist(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
GET_AGGR_VAR_AVG(aggr_var)->ex_avg_count = 0;
aggr_var->var.type = OG_TYPE_REAL;
aggr_var->var.is_null = OG_TRUE;
VALUE(double, &aggr_var->var) = 1;
ass->avg_count++;
return OG_SUCCESS;
}
static status_t sql_aggr_init_default(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
expr_node_t *aggr_node = ass->aggr_node;
if (OG_IS_VARLEN_TYPE(aggr_node->datatype)) {
aggr_str_t *aggr_str = GET_AGGR_VAR_STR(aggr_var);
aggr_str->str_result.vmid = OG_INVALID_ID32;
aggr_str->str_result.slot = OG_INVALID_ID32;
}
if (SECUREC_UNLIKELY(ass->aggr_type == AGGR_TYPE_SUM &&
aggr_node->value.v_func.orig_func_id == ID_FUNC_ITEM_COUNT)) {
aggr_var->var.type = OG_TYPE_BIGINT;
aggr_var->var.is_null = OG_FALSE;
aggr_var->var.v_bigint = 0;
} else {
aggr_var->var.type = aggr_node->datatype;
aggr_var->var.is_null = OG_TRUE;
if (ass->aggr_type == AGGR_TYPE_AVG || ass->aggr_type == AGGR_TYPE_AVG_COLLECT) {
GET_AGGR_VAR_AVG(aggr_var)->ex_avg_count = 0;
ass->avg_count++;
if (aggr_node->datatype <= OG_TYPE_REAL) {
aggr_var->var.type = OG_TYPE_REAL;
} else {
aggr_var->var.type = OG_TYPE_NUMBER;
}
}
}
return OG_SUCCESS;
}
static status_t sql_reset_sort_segment(sql_stmt_t *ogsql_stmt, sql_cursor_t *cursor)
{
mtrl_page_t *page = NULL;
mtrl_segment_t *segment = NULL;
if (cursor->mtrl.sort_seg == OG_INVALID_ID32) {
return OG_SUCCESS;
}
segment = ogsql_stmt->mtrl.segments[cursor->mtrl.sort_seg];
if (segment->vm_list.count == 0 || segment->curr_page == NULL) {
return OG_SUCCESS;
}
page = (mtrl_page_t *)segment->curr_page->data;
if (segment->vm_list.count == 1 && page->rows == 0) {
return OG_SUCCESS;
}
mtrl_close_segment2(&ogsql_stmt->mtrl, segment);
sql_free_segment_in_vm(ogsql_stmt, cursor->mtrl.sort_seg);
vm_free_list(ogsql_stmt->mtrl.session, ogsql_stmt->mtrl.pool, &segment->vm_list);
OG_RETURN_IFERR(mtrl_extend_segment(&ogsql_stmt->mtrl, segment));
return mtrl_open_segment2(&ogsql_stmt->mtrl, segment);
}
static status_t sql_aggr_init_distinct(aggr_assist_t *ass, plan_node_t *plan)
{
if (ass->aggr_node->dis_info.need_distinct) {
hash_segment_t *hash_segment = (hash_segment_t *)ass->cursor->exec_data.aggr_dis;
hash_table_entry_t *hash_table_entry = (hash_table_entry_t *)((char *)hash_segment + sizeof(hash_segment_t));
uint32 bucket_num = sql_get_plan_hash_rows(ass->stmt, plan);
OG_RETURN_IFERR(vm_hash_table_alloc(&hash_table_entry[ass->aggr_node->dis_info.idx], hash_segment, bucket_num));
OG_RETURN_IFERR(
vm_hash_table_init(hash_segment, &hash_table_entry[ass->aggr_node->dis_info.idx], NULL, NULL, NULL));
}
return OG_SUCCESS;
}
* this function is called every time the calculation of one group of rows is done.
*/
status_t sql_init_aggr_values(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, galist_t *aggrs, uint32 *avgs)
{
const aggr_func_t *aggr_func = NULL;
aggr_var_t *aggr_var = NULL;
hash_segment_t *hash_segment = NULL;
aggr_assist_t ass;
SQL_INIT_AGGR_ASSIST(&ass, stmt, cursor);
if (cursor->exec_data.aggr_dis != NULL) {
hash_segment = (hash_segment_t *)cursor->exec_data.aggr_dis;
vm_hash_segment_deinit(hash_segment);
vm_hash_segment_init(KNL_SESSION(stmt), stmt->mtrl.pool, hash_segment, PMA_POOL, HASH_PAGES_HOLD,
HASH_AREA_SIZE);
}
for (uint32 i = 0; i < aggrs->count; i++) {
ass.aggr_node = (expr_node_t *)cm_galist_get(aggrs, i);
ass.aggr_type = GET_AGGR_FUNC(ass.aggr_node)->aggr_type;
aggr_var = sql_get_aggr_addr(cursor, i);
aggr_func = sql_get_aggr_func(ass.aggr_type);
OG_RETURN_IFERR(aggr_func->reset(aggr_var));
OG_RETURN_IFERR(aggr_func->init(&ass, aggr_var));
OG_RETURN_IFERR(sql_aggr_init_distinct(&ass, plan));
}
OG_RETURN_IFERR(sql_reset_sort_segment(stmt, cursor));
(*avgs) = (uint32)ass.avg_count;
return OG_SUCCESS;
}
static status_t sql_mtrl_aggr_page_alloc(sql_stmt_t *stmt, sql_cursor_t *cursor, uint32 size, void **result)
{
CM_POINTER4(stmt, cursor, cursor->aggr_page, result);
if (cursor->aggr_page->free_begin + size > OG_VMEM_PAGE_SIZE - sizeof(mtrl_page_t)) {
OG_THROW_ERROR(ERR_NO_FREE_VMEM, "one page free size is smaller than needed memory");
return OG_ERROR;
}
*result = sql_get_aggr_free_start_addr(cursor);
cursor->aggr_page->free_begin += size;
return OG_SUCCESS;
}
static bool32 sql_judge_func_arg_type(sql_stmt_t *stmt, expr_node_t *aggr_node, variant_t *sep_val)
{
const sql_func_t *func = GET_AGGR_FUNC(aggr_node);
OGSQL_SAVE_STACK(stmt);
if (func->builtin_func_id == ID_FUNC_ITEM_GROUP_CONCAT) {
if (!OG_IS_STRING_TYPE(sep_val->type)) {
OG_THROW_ERROR(ERR_INVALID_SEPARATOR, T2S(&func->name));
return OG_ERROR;
}
}
if (func->builtin_func_id == ID_FUNC_ITEM_LISTAGG) {
if (!OG_IS_STRING_TYPE(sep_val->type) && !OG_IS_NUMERIC_TYPE(sep_val->type) &&
!OG_IS_DATETIME_TYPE(sep_val->type)) {
OG_SRC_THROW_ERROR(aggr_node->loc, ERR_SQL_SYNTAX_ERROR,
"the separator argument of listagg must be a string or number or date variant.");
return OG_ERROR;
}
if (!OG_IS_STRING_TYPE(sep_val->type)) {
if (sql_var_as_string(stmt, sep_val) != OG_SUCCESS) {
OGSQL_RESTORE_STACK(stmt);
return OG_ERROR;
}
}
}
OGSQL_RESTORE_STACK(stmt);
return OG_SUCCESS;
}
static status_t if_sepvar_exprn_is_certain(visit_assist_t *v_ast, expr_node_t **exprn)
{
expr_node_t *ori_exprn = sql_get_origin_ref(*exprn);
if (ori_exprn->type == EXPR_NODE_RESERVED) {
if (!(chk_if_reserved_word_constant(ori_exprn->value.v_res.res_id) || is_ancestor_res_rowid(ori_exprn))) {
v_ast->result0 = OG_FALSE;
}
return OG_SUCCESS;
}
if (ori_exprn->type == EXPR_NODE_COLUMN || ori_exprn->type == EXPR_NODE_TRANS_COLUMN) {
if (NODE_ANCESTOR(ori_exprn) == 0) {
v_ast->result0 = OG_FALSE;
}
return OG_SUCCESS;
}
return OG_SUCCESS;
}
static status_t sql_init_group_concat_sepvar(sql_stmt_t *stmt, sql_cursor_t *cursor, expr_node_t *aggr_node,
aggr_group_concat_t *aggr_group)
{
expr_tree_t *sep = NULL;
variant_t sep_val;
variant_t *sep_cpy = NULL;
sep = aggr_node->argument;
sep_cpy = &aggr_group->extra;
if (sep != NULL) {
OG_RETURN_IFERR(sql_exec_expr_node(stmt, sep->root, &sep_val));
OG_RETURN_IFERR(sql_judge_func_arg_type(stmt, aggr_node, &sep_val));
sep_cpy->type = sep_val.type;
sep_cpy->is_null = sep_val.is_null;
if (sep_cpy->is_null == OG_FALSE) {
expr_node_t *sepvar_exprn = sep->root;
visit_assist_t v_ast = {0};
sql_init_visit_assist(&v_ast, NULL, NULL);
v_ast.result0 = OG_TRUE;
OG_RETURN_IFERR(visit_expr_node(&v_ast, &sepvar_exprn, if_sepvar_exprn_is_certain));
if (v_ast.result0 == OG_FALSE) {
OG_SRC_THROW_ERROR(sep->loc, ERR_SQL_SYNTAX_ERROR, "Argument should be constant.");
return OG_ERROR;
}
OG_RETURN_IFERR(sql_mtrl_aggr_page_alloc(stmt, cursor, sep_val.v_text.len, (void **)&sep_cpy->v_text.str));
sep_cpy->v_text.len = sep_val.v_text.len;
if (sep_val.v_text.len != 0) {
MEMS_RETURN_IFERR(
memcpy_s(sep_cpy->v_text.str, sep_val.v_text.len, sep_val.v_text.str, sep_val.v_text.len));
}
}
} else {
sep_cpy->is_null = OG_TRUE;
sep_cpy->type = OG_TYPE_STRING;
sep_cpy->v_text.len = OG_INVALID_ID32;
sep_cpy->v_text.str = NULL;
}
return OG_SUCCESS;
}
static status_t sql_init_sort_4_listagg(sql_stmt_t *stmt, sql_cursor_t *cursor, expr_node_t *aggr_node,
aggr_var_t *aggr_var)
{
aggr_group_concat_t *aggr_group = GET_AGGR_VAR_GROUPCONCAT(aggr_var);
OG_RETURN_IFERR(sql_init_group_concat_sepvar(stmt, cursor, aggr_node, aggr_group));
aggr_var->var.type = aggr_node->datatype;
if (aggr_node->sort_items != NULL && cursor->mtrl.sort_seg == OG_INVALID_ID32) {
OG_RETURN_IFERR(mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_SORT_SEG, NULL, &cursor->mtrl.sort_seg));
OG_RETURN_IFERR(mtrl_open_segment(&stmt->mtrl, cursor->mtrl.sort_seg));
}
return OG_SUCCESS;
}
static status_t sql_init_sort_4_median(sql_stmt_t *stmt, sql_cursor_t *cursor, expr_node_t *aggr_node,
aggr_var_t *aggr_var)
{
if (cursor->mtrl.sort_seg == OG_INVALID_ID32) {
OG_RETURN_IFERR(mtrl_create_segment(&stmt->mtrl, MTRL_SEGMENT_SORT_SEG, NULL, &cursor->mtrl.sort_seg));
OG_RETURN_IFERR(mtrl_open_segment(&stmt->mtrl, cursor->mtrl.sort_seg));
}
return OG_SUCCESS;
}
static status_t sql_aggr_alloc_listagg(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
aggr_group_concat_t *aggr_group = NULL;
OG_RETURN_IFERR(
sql_mtrl_aggr_page_alloc(ass->stmt, ass->cursor, sizeof(aggr_group_concat_t), (void **)&aggr_group));
aggr_var->extra_offset = (uint32)((char *)aggr_group - (char *)aggr_var);
aggr_var->extra_size = sizeof(aggr_group_concat_t);
aggr_group->aggr_str.aggr_bufsize = 0;
aggr_group->total_len = 0;
aggr_group->aggr_str.str_result.vmid = OG_INVALID_ID32;
aggr_group->aggr_str.str_result.slot = OG_INVALID_ID32;
aggr_group->sort_rid.slot = OG_INVALID_ID32;
aggr_group->type_buf = NULL;
* if the aggr function is group_concat, we need to create two buffers for it in the mtrl page.
* one is for the possible separator, the other for the temporary result for string concat
*/
return sql_init_sort_4_listagg(ass->stmt, ass->cursor, ass->aggr_node, aggr_var);
}
static status_t sql_aggr_alloc_min_max(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
aggr_str_t *aggr_str = NULL;
if (OG_IS_VARLEN_TYPE(ass->aggr_node->datatype)) {
OG_RETURN_IFERR(sql_mtrl_aggr_page_alloc(ass->stmt, ass->cursor, sizeof(aggr_str_t), (void **)&aggr_str));
aggr_var->extra_offset = (uint32)((char *)aggr_str - (char *)aggr_var);
aggr_var->extra_size = sizeof(aggr_str_t);
aggr_str->aggr_bufsize = 0;
aggr_str->str_result.vmid = OG_INVALID_ID32;
aggr_str->str_result.slot = OG_INVALID_ID32;
} else {
aggr_var->extra_offset = 0;
aggr_var->extra_size = 0;
}
return OG_SUCCESS;
}
static inline status_t sql_aggr_alloc_stddev(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
aggr_stddev_t *aggr_stddev = NULL;
OG_RETURN_IFERR(sql_mtrl_aggr_page_alloc(ass->stmt, ass->cursor, sizeof(aggr_stddev_t), (void **)&aggr_stddev));
aggr_var->extra_offset = (uint32)((char *)aggr_stddev - (char *)aggr_var);
aggr_var->extra_size = sizeof(aggr_stddev_t);
aggr_stddev->extra.is_null = OG_TRUE;
return OG_SUCCESS;
}
static inline status_t sql_aggr_alloc_covar(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
aggr_covar_t *aggr_covar = NULL;
OG_RETURN_IFERR(sql_mtrl_aggr_page_alloc(ass->stmt, ass->cursor, sizeof(aggr_covar_t), (void **)&aggr_covar));
aggr_var->extra_offset = (uint32)((char *)aggr_covar - (char *)aggr_var);
aggr_var->extra_size = sizeof(aggr_covar_t);
aggr_covar->extra.is_null = OG_TRUE;
aggr_covar->extra_1.is_null = OG_TRUE;
return OG_SUCCESS;
}
static inline status_t sql_aggr_alloc_corr(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
aggr_corr_t *aggr_corr = NULL;
OG_RETURN_IFERR(sql_mtrl_aggr_page_alloc(ass->stmt, ass->cursor, sizeof(aggr_corr_t), (void **)&aggr_corr));
aggr_var->extra_offset = (uint32)((char *)aggr_corr - (char *)aggr_var);
aggr_var->extra_size = sizeof(aggr_corr_t);
return OG_SUCCESS;
}
static inline status_t sql_aggr_alloc_avg(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
aggr_avg_t *aggr_avg = NULL;
OG_RETURN_IFERR(sql_mtrl_aggr_page_alloc(ass->stmt, ass->cursor, sizeof(aggr_avg_t), (void **)&aggr_avg));
aggr_var->extra_offset = (uint32)((char *)aggr_avg - (char *)aggr_var);
aggr_var->extra_size = sizeof(aggr_avg_t);
aggr_avg->ex_avg_count = 0;
return OG_SUCCESS;
}
static inline status_t sql_aggr_alloc_median(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
aggr_median_t *aggr_median = NULL;
OG_RETURN_IFERR(sql_mtrl_aggr_page_alloc(ass->stmt, ass->cursor, sizeof(aggr_median_t), (void **)&aggr_median));
aggr_var->extra_offset = (uint32)((char *)aggr_median - (char *)aggr_var);
aggr_var->extra_size = sizeof(aggr_median_t);
aggr_median->median_count = 0;
aggr_median->sort_rid.vmid = OG_INVALID_ID32;
aggr_median->sort_rid.slot = OG_INVALID_ID32;
aggr_median->type_buf = NULL;
return sql_init_sort_4_median(ass->stmt, ass->cursor, ass->aggr_node, aggr_var);
}
static inline status_t sql_aggr_alloc_dense_rank(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
aggr_dense_rank_t *aggr_dense_rank = NULL;
OG_RETURN_IFERR(
sql_mtrl_aggr_page_alloc(ass->stmt, ass->cursor, sizeof(aggr_dense_rank_t), (void **)&aggr_dense_rank));
aggr_var->extra_offset = (uint32)((char *)aggr_dense_rank - (char *)aggr_var);
aggr_var->extra_size = sizeof(aggr_dense_rank_t);
return OG_SUCCESS;
}
static inline status_t sql_aggr_alloc_default(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
aggr_var->extra_size = 0;
aggr_var->extra_offset = 0;
return OG_SUCCESS;
}
* aggr_var_t for the aggregate function such as group_concat needs to store the
* intermediate result in a buffer. as the aggr_var_t itself is allocated in the
* mtrl page, so we can create the buffer in the mtrl page, too.
*
* @Note
* this function is called only once after the mtrl page created.
* pay attention to the difference from the timing of sql_init_aggr_values()
*/
status_t sql_init_aggr_page(sql_stmt_t *stmt, sql_cursor_t *cursor, galist_t *aggrs)
{
const sql_func_t *func = NULL;
char *aggr_vars_start = NULL;
aggr_assist_t ass;
SQL_INIT_AGGR_ASSIST(&ass, stmt, cursor);
cursor->aggr_page = mtrl_curr_page(&stmt->mtrl, cursor->mtrl.aggr);
if (cursor->aggr_page->free_begin + aggrs->count * sizeof(aggr_var_t) > OG_VMEM_PAGE_SIZE - sizeof(mtrl_page_t)) {
OG_THROW_ERROR(ERR_TOO_MANY_ARRG);
return OG_ERROR;
}
cursor->aggr_page->free_begin += aggrs->count * sizeof(aggr_var_t);
cursor->aggr_page->rows = 0;
aggr_vars_start = ((char *)cursor->aggr_page + sizeof(mtrl_page_t));
MEMS_RETURN_IFERR(
memset_s((void *)aggr_vars_start, aggrs->count * sizeof(aggr_var_t), 0, aggrs->count * sizeof(aggr_var_t)));
for (uint32 i = 0; i < aggrs->count; i++) {
aggr_var_t *aggr_var = sql_get_aggr_addr(cursor, i);
ass.aggr_node = (expr_node_t *)cm_galist_get(aggrs, i);
func = GET_AGGR_FUNC(ass.aggr_node);
aggr_var->aggr_type = func->aggr_type;
ass.aggr_type = func->aggr_type;
OG_RETURN_IFERR(sql_aggr_alloc_buf(&ass, aggr_var));
}
return OG_SUCCESS;
}
status_t sql_fetch_aggr(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, bool32 *eof)
{
if (cursor->mtrl.aggr_fetched) {
if (cursor->mtrl.aggr != OG_INVALID_ID32) {
mtrl_close_segment(&stmt->mtrl, cursor->mtrl.aggr);
mtrl_release_segment(&stmt->mtrl, cursor->mtrl.aggr);
cursor->mtrl.aggr = OG_INVALID_ID32;
cursor->aggr_page = NULL;
}
OGSQL_RELEASE_SEGMENT(stmt, cursor->mtrl.aggr_str);
if (cursor->mtrl.sort_seg != OG_INVALID_ID32) {
mtrl_close_segment(&stmt->mtrl, cursor->mtrl.sort_seg);
sql_free_segment_in_vm(stmt, cursor->mtrl.sort_seg);
mtrl_release_segment(&stmt->mtrl, cursor->mtrl.sort_seg);
cursor->mtrl.sort_seg = OG_INVALID_ID32;
}
*eof = OG_TRUE;
return OG_SUCCESS;
}
*eof = OG_FALSE;
cursor->mtrl.aggr_fetched = OG_TRUE;
return OG_SUCCESS;
}
static inline status_t sql_aggr_alloc_appx_cntdis(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
aggr_appx_cntdis_t *appx_cdist = NULL;
OG_RETURN_IFERR(sql_mtrl_aggr_page_alloc(ass->stmt, ass->cursor, sizeof(aggr_appx_cntdis_t), (void **)&appx_cdist));
OG_RETURN_IFERR(vmc_alloc_mem(&ass->cursor->vmc, APPX_MAP_SIZE, (void **)&appx_cdist->bitmap));
aggr_var->extra_offset = (uint32)((char *)appx_cdist - (char *)aggr_var);
aggr_var->extra_size = sizeof(aggr_appx_cntdis_t);
return OG_SUCCESS;
}
static inline status_t sql_aggr_init_appx_cntdis(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
aggr_var->var.type = OG_TYPE_BIGINT;
aggr_var->var.is_null = OG_FALSE;
aggr_var->var.v_bigint = 0;
ass->avg_count++;
return OG_SUCCESS;
}
static inline status_t sql_aggr_reset_appx_cntdis(aggr_var_t *aggr_var)
{
aggr_appx_cntdis_t *aggr_appx = GET_AGGR_VAR_APPX_CDIST(aggr_var);
MEMS_RETURN_IFERR(memset_sp(aggr_appx->bitmap, APPX_MAP_SIZE, 0, APPX_MAP_SIZE));
return sql_aggr_reset_default(aggr_var);
}
static status_t sql_aggr_appx_cntdis(aggr_assist_t *ass, aggr_var_t *aggr_var, variant_t *value)
{
if (value->is_null) {
return OG_SUCCESS;
}
uint32 hashval = value->v_bigint;
uint32 bucket = hashval & (APPX_MAP_SIZE - 1);
uint8 bits = 0;
hashval >>= APPX_BITS;
while (bits < (UINT32_BITS - APPX_BITS) && OG_BIT_TEST(hashval, OG_GET_MASK(bits)) == 0) {
bits++;
}
aggr_appx_cntdis_t *appx = GET_AGGR_VAR_APPX_CDIST(aggr_var);
appx->bitmap[bucket] = MAX(appx->bitmap[bucket], bits + 1);
aggr_var->var.is_null = OG_FALSE;
return OG_SUCCESS;
}
static status_t sql_aggr_calc_appx_cntdis(aggr_assist_t *ass, aggr_var_t *aggr_var)
{
uint32 eblocks = 0;
double appx_sum = 0;
double appx_value = 0;
aggr_appx_cntdis_t *appx = GET_AGGR_VAR_APPX_CDIST(aggr_var);
for (uint32 i = 0; i < APPX_MAP_SIZE; ++i) {
appx_sum += 1.0 / ((uint32)1 << appx->bitmap[i]);
eblocks += (appx->bitmap[i] == 0) ? 1 : 0;
}
if (appx_sum > 0) {
appx_value = APPX_ALPHA_MM / appx_sum;
if (appx_value <= APPX_MIN_VAL && eblocks > 0) {
appx_value = APPX_MAP_SIZE * log((double)APPX_MAP_SIZE / eblocks);
} else if (appx_value > APPX_MAX_VAL) {
appx_value = -log(1 - appx_value / ((uint64)1 << UINT32_BITS)) * ((uint64)1 << UINT32_BITS);
}
}
aggr_var->var.v_bigint = (int64)appx_value;
aggr_var->var.type = OG_TYPE_BIGINT;
aggr_var->var.is_null = OG_FALSE;
return OG_SUCCESS;
}
* **NOTE:**
* 1. The function must be arranged by alphabetical ascending order.
* 2. An enum stands for function index was added in ogsql_func.h.
* if any built-in function added or removed from the following array,
* please modify the enum definition, too.
* 3. add function should add the define id in en_sql_aggr_type at ogsql_func.h.
*/
aggr_func_t g_aggr_func_tab[] = {
{ AGGR_TYPE_NONE, sql_aggr_alloc_default, sql_aggr_init_default, sql_aggr_reset_default, sql_aggr_none,
sql_aggr_calc_none },
{ AGGR_TYPE_AVG, sql_aggr_alloc_avg, sql_aggr_init_default, sql_aggr_reset_avg, sql_aggr_avg, sql_aggr_calc_avg },
{ AGGR_TYPE_SUM, sql_aggr_alloc_default, sql_aggr_init_default, sql_aggr_reset_default, sql_aggr_sum,
sql_aggr_calc_none },
{ AGGR_TYPE_MIN, sql_aggr_alloc_min_max, sql_aggr_init_default, sql_aggr_reset_min_max, sql_aggr_min_max,
sql_aggr_calc_none },
{ AGGR_TYPE_MAX, sql_aggr_alloc_min_max, sql_aggr_init_default, sql_aggr_reset_min_max, sql_aggr_min_max,
sql_aggr_calc_none },
{ AGGR_TYPE_COUNT, sql_aggr_alloc_default, sql_aggr_init_count, sql_aggr_reset_default, sql_aggr_count,
sql_aggr_calc_none },
{ AGGR_TYPE_AVG_COLLECT, sql_aggr_alloc_avg, sql_aggr_init_default, sql_aggr_reset_avg, sql_aggr_avg,
sql_aggr_calc_avg },
{ AGGR_TYPE_GROUP_CONCAT, sql_aggr_alloc_listagg, sql_aggr_init_listagg, sql_aggr_reset_listagg, sql_aggr_listagg,
sql_aggr_calc_listagg },
{ AGGR_TYPE_STDDEV, sql_aggr_alloc_stddev, sql_aggr_init_stddev, sql_aggr_reset_stddev, sql_aggr_stddev,
sql_aggr_calc_stddev },
{ AGGR_TYPE_STDDEV_POP, sql_aggr_alloc_stddev, sql_aggr_init_stddev, sql_aggr_reset_stddev, sql_aggr_stddev,
sql_aggr_calc_stddev },
{ AGGR_TYPE_STDDEV_SAMP, sql_aggr_alloc_stddev, sql_aggr_init_stddev, sql_aggr_reset_stddev, sql_aggr_stddev,
sql_aggr_calc_stddev },
{ AGGR_TYPE_LAG, sql_aggr_alloc_default, sql_aggr_init_default, sql_aggr_reset_default, sql_aggr_none,
sql_aggr_calc_none },
{ AGGR_TYPE_ARRAY_AGG, sql_aggr_alloc_default, sql_aggr_init_array_agg, sql_aggr_reset_array_agg,
sql_aggr_array_agg, sql_aggr_calc_none },
{ AGGR_TYPE_NTILE, sql_aggr_alloc_default, sql_aggr_init_default, sql_aggr_reset_default, sql_aggr_none,
sql_aggr_calc_none },
{ AGGR_TYPE_MEDIAN, sql_aggr_alloc_median, sql_aggr_init_median, sql_aggr_reset_median, sql_aggr_median,
sql_aggr_calc_median },
{ AGGR_TYPE_CUME_DIST, sql_aggr_alloc_avg, sql_aggr_init_cume_dist, sql_aggr_reset_avg, sql_aggr_cume_dist,
sql_aggr_calc_cume_dist },
{ AGGR_TYPE_VARIANCE, sql_aggr_alloc_stddev, sql_aggr_init_stddev, sql_aggr_reset_stddev, sql_aggr_stddev,
sql_aggr_calc_stddev },
{ AGGR_TYPE_VAR_POP, sql_aggr_alloc_stddev, sql_aggr_init_stddev, sql_aggr_reset_stddev, sql_aggr_stddev,
sql_aggr_calc_stddev },
{ AGGR_TYPE_VAR_SAMP, sql_aggr_alloc_stddev, sql_aggr_init_stddev, sql_aggr_reset_stddev, sql_aggr_stddev,
sql_aggr_calc_stddev },
{ AGGR_TYPE_COVAR_POP, sql_aggr_alloc_covar, sql_aggr_init_covar, sql_aggr_reset_covar, sql_aggr_covar,
sql_aggr_calc_covar },
{ AGGR_TYPE_COVAR_SAMP, sql_aggr_alloc_covar, sql_aggr_init_covar, sql_aggr_reset_covar, sql_aggr_covar,
sql_aggr_calc_covar },
{ AGGR_TYPE_CORR, sql_aggr_alloc_corr, sql_aggr_init_corr, sql_aggr_reset_corr, sql_aggr_corr, sql_aggr_calc_corr },
{ AGGR_TYPE_DENSE_RANK, sql_aggr_alloc_dense_rank, sql_aggr_init_rank, sql_aggr_reset_rank, sql_aggr_dense_rank,
sql_aggr_calc_none },
{ AGGR_TYPE_FIRST_VALUE, sql_aggr_alloc_default, sql_aggr_init_default, sql_aggr_reset_default, sql_aggr_none,
sql_aggr_calc_none },
{ AGGR_TYPE_LAST_VALUE, sql_aggr_alloc_default, sql_aggr_init_default, sql_aggr_reset_default, sql_aggr_none,
sql_aggr_calc_none },
{ AGGR_TYPE_RANK, sql_aggr_alloc_default, sql_aggr_init_rank, sql_aggr_reset_rank, sql_aggr_rank,
sql_aggr_calc_none },
{ AGGR_TYPE_APPX_CNTDIS, sql_aggr_alloc_appx_cntdis, sql_aggr_init_appx_cntdis, sql_aggr_reset_appx_cntdis,
sql_aggr_appx_cntdis, sql_aggr_calc_appx_cntdis },
};
status_t og_sql_aggr_get_value(sql_stmt_t *statement, uint32 id, variant_t *value)
{
sql_cursor_t *cursor = OGSQL_CURR_CURSOR(statement);
if (cursor == NULL) {
return OG_ERROR;
}
cursor = sql_get_group_cursor(cursor);
if (cursor == NULL) {
return OG_ERROR;
}
mtrl_cursor_t *mtrl_cursor = &cursor->mtrl.cursor;
if (mtrl_cursor->type == MTRL_CURSOR_HASH_GROUP || mtrl_cursor->type == MTRL_CURSOR_SORT_GROUP) {
if (mtrl_cursor->hash_group.aggrs == NULL) {
return OG_ERROR;
}
} else {
if (cursor->aggr_page == NULL) {
return OG_ERROR;
}
}
aggr_var_t *aggr_var = sql_get_aggr_addr(cursor, id);
var_copy(&aggr_var->var, value);
return OG_SUCCESS;
}