* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* ogsql_concate.c
*
*
* IDENTIFICATION
* src/ogsql/executor/ogsql_concate.c
*
* -------------------------------------------------------------------------
*/
#include "ogsql_concate.h"
#include "ogsql_select.h"
#include "ogsql_mtrl.h"
#include "ogsql_scan.h"
#include "srv_instance.h"
static inline status_t sql_alloc_concate_ctx(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
uint32 vmid;
vm_page_t *vm_page = NULL;
concate_ctx_t *concate_ctx = NULL;
plan_node_t *sub_plan = NULL;
uint32 bucket_num;
OG_RETURN_IFERR(vm_alloc(KNL_SESSION(stmt), KNL_SESSION(stmt)->temp_pool, &vmid));
if (vm_open(KNL_SESSION(stmt), KNL_SESSION(stmt)->temp_pool, vmid, &vm_page) != OG_SUCCESS) {
vm_free(KNL_SESSION(stmt), KNL_SESSION(stmt)->temp_pool, vmid);
return OG_ERROR;
}
concate_ctx = (concate_ctx_t *)vm_page->data;
concate_ctx->id = 0;
concate_ctx->vmid = vmid;
concate_ctx->curr_plan = NULL;
concate_ctx->keys = plan->cnct_p.keys;
concate_ctx->sub_plans = plan->cnct_p.plans;
concate_ctx->buf = (char *)vm_page->data + sizeof(concate_ctx_t);
sql_init_hash_iter(&concate_ctx->iter, NULL);
cursor->cnct_ctx = concate_ctx;
bucket_num = 0;
for (uint32 i = 0; i < concate_ctx->sub_plans->count; ++i) {
sub_plan = (plan_node_t *)cm_galist_get(concate_ctx->sub_plans, i);
bucket_num += sql_get_plan_hash_rows(stmt, sub_plan);
}
bucket_num = MIN(bucket_num, OG_HASH_JOIN_COUNT);
vm_hash_segment_init(KNL_SESSION(stmt), stmt->mtrl.pool, &concate_ctx->hash_segment, PMA_POOL, HASH_PAGES_HOLD,
HASH_AREA_SIZE);
OG_RETURN_IFERR(vm_hash_table_alloc(&concate_ctx->hash_table, &concate_ctx->hash_segment, bucket_num));
OG_RETURN_IFERR(vm_hash_table_init(&concate_ctx->hash_segment, &concate_ctx->hash_table, NULL, NULL, NULL));
return OG_SUCCESS;
}
void sql_free_concate_ctx(sql_stmt_t *ogsql_stmt, concate_ctx_t *ogx)
{
vm_hash_segment_deinit(&ogx->hash_segment);
vm_free(KNL_SESSION(ogsql_stmt), KNL_SESSION(ogsql_stmt)->temp_pool, ogx->vmid);
}
static inline status_t sql_execute_child_plan(sql_stmt_t *ogsql_stmt, sql_cursor_t *cursor, plan_node_t *sub_plan,
bool32 *eof)
{
switch (sub_plan->type) {
case PLAN_NODE_SCAN:
return sql_execute_scan(ogsql_stmt, cursor, sub_plan);
case PLAN_NODE_JOIN:
return sql_execute_join(ogsql_stmt, cursor, sub_plan, eof);
default:
break;
}
OG_THROW_ERROR(ERR_SQL_PLAN_ERROR, "not support plan type for concate", sub_plan->type);
return OG_ERROR;
}
static inline status_t sql_execute_for_concate(sql_stmt_t *ogsql_stmt, sql_cursor_t *cursor, concate_ctx_t *ogx,
bool32 switch_plan, bool32 *eof)
{
bool32 sub_eof = OG_FALSE;
ogx->id = switch_plan ? ogx->id + 1 : ogx->id;
while (ogx->id < ogx->sub_plans->count) {
ogx->curr_plan = (plan_node_t *)cm_galist_get(ogx->sub_plans, ogx->id);
OG_RETURN_IFERR(sql_execute_child_plan(ogsql_stmt, cursor, ogx->curr_plan, &sub_eof));
if (!sub_eof) {
return OG_SUCCESS;
}
++ogx->id;
}
*eof = OG_TRUE;
return OG_SUCCESS;
}
static inline status_t sql_fetch_sub_plan(sql_stmt_t *ogsql_stmt, sql_cursor_t *cursor, plan_node_t *sub_plan, bool32
*eof)
{
switch (sub_plan->type) {
case PLAN_NODE_JOIN:
return sql_fetch_join(ogsql_stmt, cursor, sub_plan, eof);
case PLAN_NODE_SCAN:
cursor->last_table = sub_plan->scan_p.table->plan_id;
return sql_fetch_scan(ogsql_stmt, cursor, sub_plan, eof);
default:
break;
}
OG_THROW_ERROR(ERR_SQL_PLAN_ERROR, "not support plan type for concate", sub_plan->type);
return OG_ERROR;
}
static status_t sql_fetch_for_concate(sql_stmt_t *stmt, sql_cursor_t *cursor, concate_ctx_t *ogx, bool32 *eof)
{
bool32 sub_eof = OG_FALSE;
*eof = OG_FALSE;
while (OG_TRUE) {
OG_RETURN_IFERR(sql_fetch_sub_plan(stmt, cursor, ogx->curr_plan, &sub_eof));
if (!sub_eof) {
return OG_SUCCESS;
}
OG_RETURN_IFERR(sql_execute_for_concate(stmt, cursor, ogx, OG_TRUE, eof));
if (*eof) {
return OG_SUCCESS;
}
sub_eof = OG_FALSE;
}
}
static inline status_t make_concate_hash_key(sql_stmt_t *stmt, galist_t *keys, char *buf)
{
variant_t value;
expr_tree_t *key = NULL;
row_assist_t ra;
row_init(&ra, buf, OG_MAX_ROW_SIZE, keys->count);
for (uint32 i = 0; i < keys->count; i++) {
key = (expr_tree_t *)cm_galist_get(keys, i);
OG_RETURN_IFERR(sql_exec_expr(stmt, key, &value));
OG_RETURN_IFERR(sql_put_row_value(stmt, NULL, &ra, key->root->datatype, &value));
}
return OG_SUCCESS;
}
status_t sql_execute_concate(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan)
{
CM_TRACE_BEGIN;
if (cursor->cnct_ctx != NULL) {
sql_free_concate_ctx(stmt, cursor->cnct_ctx);
cursor->cnct_ctx = NULL;
}
OG_RETURN_IFERR(sql_alloc_concate_ctx(stmt, cursor, plan));
OG_RETURN_IFERR(sql_execute_for_concate(stmt, cursor, cursor->cnct_ctx, OG_FALSE, &cursor->eof));
CM_TRACE_END(stmt, plan->plan_id);
return OG_SUCCESS;
}
status_t sql_fetch_concate(sql_stmt_t *stmt, sql_cursor_t *cursor, plan_node_t *plan, bool32 *eof)
{
bool32 exist_row = OG_FALSE;
concate_ctx_t *ogx = cursor->cnct_ctx;
CM_TRACE_BEGIN;
if (cursor->eof) {
*eof = OG_TRUE;
return OG_SUCCESS;
}
for (;;) {
OGSQL_SAVE_STACK(stmt);
OG_RETURN_IFERR(sql_fetch_for_concate(stmt, cursor, ogx, eof));
if (*eof) {
OGSQL_RESTORE_STACK(stmt);
CM_TRACE_END(stmt, plan->plan_id);
return OG_SUCCESS;
}
OG_RETURN_IFERR(make_concate_hash_key(stmt, ogx->keys, ogx->buf));
OG_RETURN_IFERR(vm_hash_table_insert2(&exist_row, &ogx->hash_segment, &ogx->hash_table, ogx->buf,
((row_head_t *)ogx->buf)->size));
OGSQL_RESTORE_STACK(stmt);
if (!exist_row) {
break;
}
}
CM_TRACE_END(stmt, plan->plan_id);
return OG_SUCCESS;
}