* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* opfusion_selectforann.cpp
* Definition of select operator for bypass executor.
*
* IDENTIFICATION
* src/gausskernel/runtime/opfusion/opfusion_selectforann.cpp
*
* ---------------------------------------------------------------------------------------
*/
#include "opfusion/opfusion_selectforann.h"
#include "opfusion/opfusion_annindexscan.h"
#include "utils/fmgroids.h"
SelectForAnnFusion::SelectForAnnFusion(MemoryContext context, CachedPlanSource* psrc, List* plantree_list, ParamListInfo params)
: OpFusion(context, psrc, plantree_list)
{
MemoryContext old_context = NULL;
reInitLimit = false;
nodeLimitCount = NULL;
nodeLimitOffset = NULL;
if (!IsGlobal()) {
old_context = MemoryContextSwitchTo(m_global->m_context);
InitGlobals();
MemoryContextSwitchTo(old_context);
} else {
m_c_global = ((SelectForAnnFusion*)(psrc->opFusionObj))->m_c_global;
}
old_context = MemoryContextSwitchTo(m_local.m_localContext);
InitLocals(params);
MemoryContextSwitchTo(old_context);
}
void SelectForAnnFusion::InitGlobals()
{
m_global->m_reloid = 0;
m_c_global = (SelectForAnnFusionGlobalVariable*)palloc0(sizeof(SelectForAnnFusionGlobalVariable));
m_c_global->m_limitCount = -1;
m_c_global->m_limitOffset = -1;
if (IsA(m_global->m_planstmt->planTree, Limit)) {
Limit* limit = (Limit*)m_global->m_planstmt->planTree;
if (limit->limitCount != NULL && IsA(limit->limitCount, Const) && !((Const*)limit->limitCount)->constisnull) {
m_c_global->m_limitCount = DatumGetInt64(((Const*)limit->limitCount)->constvalue);
} else if (limit->limitCount != NULL &&
(IsA(limit->limitCount, Param) || IsA(limit->limitCount, FuncExpr))) {
reInitLimit = true;
nodeLimitCount = limit->limitCount;
}
if (limit->limitOffset != NULL && IsA(limit->limitOffset, Const) &&
!((Const*)limit->limitOffset)->constisnull) {
m_c_global->m_limitOffset = DatumGetInt64(((Const*)limit->limitOffset)->constvalue);
} else if (limit->limitOffset != NULL && IsA(limit->limitOffset, Param)) {
reInitLimit = true;
nodeLimitOffset = limit->limitOffset;
}
}
}
void SelectForAnnFusion::InitLocals(ParamListInfo params)
{
m_local.m_tmpvals = NULL;
m_local.m_values = NULL;
m_local.m_isnull = NULL;
m_local.m_tmpisnull = NULL;
initParams(params);
m_local.m_receiver = NULL;
m_local.m_isInsideRec = true;
Node* node = NULL;
if (IsA(m_global->m_planstmt->planTree, Limit)) {
node = JudgePlanIsPartIterator(m_global->m_planstmt->planTree->lefttree);
} else {
node = JudgePlanIsPartIterator(m_global->m_planstmt->planTree);
}
m_local.m_scan = New(CurrentMemoryContext) AnnIndexScanFusion((AnnIndexScan*)node, m_global->m_planstmt,
m_local.m_outParams ? m_local.m_outParams : m_local.m_params);
if (!IsGlobal()) {
MemoryContext old_context = MemoryContextSwitchTo(m_global->m_context);
m_global->m_tupDesc = CreateTupleDescCopy(m_local.m_scan->m_tupDesc);
MemoryContextSwitchTo(old_context);
}
m_local.m_reslot = NULL;
if (reInitLimit) {
((AnnIndexScanFusion*)(m_local.m_scan))->SetUseLimit();
}
}
static Datum GetExprVal(Expr* node, ParamListInfo params)
{
Datum ret = 0;
switch (nodeTag(node)) {
case T_FuncExpr: {
FuncExpr* funcexpr = (FuncExpr*)node;
Datum arg = GetExprVal((Expr*)funcexpr->args, params);
if (funcexpr->funcid == F_INT48) {
int32 arg1 = DatumGetInt32(arg);
ret = Int64GetDatum(arg1);
} else if (funcexpr->funcid == F_I2TOI4) {
int16 arg1 = DatumGetInt16(arg);
ret = Int32GetDatum(arg1);
}
} break;
case T_List: {
List* list = (List*)node;
if (list_length(list) == 1) {
return GetExprVal((Expr*)linitial(list), params);
}
} break;
case T_Param: {
Param* param = (Param*)node;
if (param->paramid <= params->numParams) {
ret = params->params[param->paramid - 1].value;
}
} break;
default:
break;
}
return ret;
}
void SelectForAnnFusion::InitLimit(ParamListInfo params)
{
if (!reInitLimit) {
return;
}
if (nodeLimitCount != NULL && IsA(nodeLimitCount, Param)) {
Param* param = (Param*)nodeLimitCount;
m_c_global->m_limitCount = params->params[param->paramid - 1].value;
} else if (nodeLimitCount != NULL && IsA(nodeLimitCount, FuncExpr)) {
Datum val = GetExprVal((Expr*)nodeLimitCount, params);
m_c_global->m_limitCount = DatumGetInt64(val);
}
if (nodeLimitOffset != NULL && IsA(nodeLimitOffset, Param)) {
Param* param = (Param*)nodeLimitOffset;
m_c_global->m_limitOffset = params->params[param->paramid - 1].value;
}
}
bool SelectForAnnFusion::execute(long max_rows, char* completionTag)
{
MemoryContext oldContext = MemoryContextSwitchTo(m_local.m_tmpContext);
bool success = false;
int64 start_row = 0;
int64 get_rows = 0;
ParamListInfo params = m_local.m_outParams == NULL ? m_local.m_params : m_local.m_outParams;
* step 1: prepare *
*******************/
InitLimit(params);
start_row = m_c_global->m_limitOffset >= 0 ? m_c_global->m_limitOffset : start_row;
int64 alreadyfetch = (m_local.m_position > start_row) ? (m_local.m_position - start_row) : 0;
get_rows = max_rows;
if (m_c_global->m_limitCount >= 0) {
int64 limit_row = (m_c_global->m_limitCount - alreadyfetch > 0) ? m_c_global->m_limitCount - alreadyfetch : 0;
get_rows = (limit_row > max_rows) ? max_rows : limit_row;
}
* step 2: begin scan *
**********************/
if (m_local.m_position == 0) {
m_local.m_scan->refreshParameter(params);
long ef_search = start_row + get_rows;
ef_search = (ef_search > u_sess->datavec_ctx.hnsw_ef_search) ? ef_search : 0;
m_local.m_scan->Init(ef_search);
}
setReceiver();
unsigned long nprocessed = 0;
TupleTableSlot* offset_reslot = NULL;
while (m_local.m_position < (long)start_row && (offset_reslot = m_local.m_scan->getTupleSlot()) != NULL) {
tpslot_free_heaptuple(offset_reslot);
m_local.m_position++;
}
if (m_local.m_position < (long)start_row) {
Assert(offset_reslot == NULL);
get_rows = 0;
m_local.m_isCompleted = true;
}
while (nprocessed < (unsigned long)get_rows && (m_local.m_reslot = m_local.m_scan->getTupleSlot()) != NULL) {
CHECK_FOR_INTERRUPTS();
m_local.m_position++;
nprocessed++;
(*m_local.m_receiver->receiveSlot)(m_local.m_reslot, m_local.m_receiver);
tpslot_free_heaptuple(m_local.m_reslot);
}
if (!ScanDirectionIsNoMovement(*(m_local.m_scan->m_direction))) {
bool has_complete = (max_rows == 0 || nprocessed < (unsigned long)max_rows);
if (has_complete) {
m_local.m_isCompleted = true;
}
} else {
m_local.m_isCompleted = true;
}
if (m_local.m_portalName == NULL || m_local.m_portalName[0] == '\0') {
m_local.m_isCompleted = true;
}
success = true;
* step 3: done *
****************/
if (m_local.m_isInsideRec) {
(*m_local.m_receiver->rDestroy)(m_local.m_receiver);
}
m_local.m_scan->End(m_local.m_isCompleted);
if (m_local.m_isCompleted) {
m_local.m_position = 0;
}
errno_t errorno =
snprintf_s(completionTag, COMPLETION_TAG_BUFSIZE, COMPLETION_TAG_BUFSIZE - 1, "SELECT %lu", nprocessed);
securec_check_ss(errorno, "\0", "\0");
MemoryContextSwitchTo(oldContext);
UniqueSQLStatCountReturnedRows(nprocessed);
return success;
}
void SelectForAnnFusion::close()
{
if (m_local.m_isCompleted == false) {
m_local.m_scan->End(true);
m_local.m_isCompleted = true;
m_local.m_position = 0;
UnregisterSnapshot(m_local.m_snapshot);
m_local.m_snapshot = NULL;
}
}