* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* opfusion_sort.cpp
* Definition of sort operator for bypass executor.
*
* IDENTIFICATION
* src/gausskernel/runtime/opfusion/opfusion_sort.cpp
*
* ---------------------------------------------------------------------------------------
*/
#include "opfusion/opfusion_sort.h"
#include "access/tableam.h"
SortFusion::SortFusion(MemoryContext context, CachedPlanSource* psrc, List* plantree_list, ParamListInfo params)
: OpFusion(context, psrc, plantree_list)
{
MemoryContext old_context = NULL;
if (!IsGlobal()) {
old_context = MemoryContextSwitchTo(m_global->m_context);
InitGlobals();
MemoryContextSwitchTo(old_context);
}
old_context = MemoryContextSwitchTo(m_local.m_localContext);
InitLocals(params);
MemoryContextSwitchTo(old_context);
}
void SortFusion::InitLocals(ParamListInfo params)
{
Plan *node = (Plan*)m_global->m_planstmt->planTree->lefttree;
Sort *sortnode = (Sort *)m_global->m_planstmt->planTree;
m_c_local.m_scanDesc = ExecTypeFromTL(node->targetlist, false);
initParams(params);
m_local.m_receiver = NULL;
m_local.m_isInsideRec = true;
m_local.m_scan = ScanFusion::getScanFusion((Node*)sortnode->plan.lefttree, m_global->m_planstmt,
m_local.m_outParams ? m_local.m_outParams : m_local.m_params);
m_c_local.m_scanDesc->td_tam_ops = m_local.m_scan->m_tupDesc->td_tam_ops;
if (!IsGlobal())
m_global->m_tupDesc->td_tam_ops = m_local.m_scan->m_tupDesc->td_tam_ops;
m_local.m_reslot = MakeSingleTupleTableSlot(m_global->m_tupDesc, false, m_local.m_scan->m_tupDesc->td_tam_ops);
m_local.m_values = (Datum*)palloc0(m_global->m_tupDesc->natts * sizeof(Datum));
m_local.m_isnull = (bool*)palloc0(m_global->m_tupDesc->natts * sizeof(bool));
}
void SortFusion::InitGlobals()
{
m_global->m_reloid = 0;
Sort *sortnode = (Sort *)m_global->m_planstmt->planTree;
m_global->m_tupDesc = ExecCleanTypeFromTL(sortnode->plan.targetlist, false);
m_global->m_attrno = (int16 *)palloc(m_global->m_tupDesc->natts * sizeof(int16));
ListCell *lc = NULL;
int cur_resno = 1;
foreach (lc, sortnode->plan.targetlist) {
TargetEntry *res = (TargetEntry *)lfirst(lc);
if (res->resjunk)
continue;
Var *var = (Var *)res->expr;
m_global->m_attrno[cur_resno - 1] = var->varattno;
cur_resno++;
}
}
bool SortFusion::execute(long max_rows, char *completionTag)
{
max_rows = FETCH_ALL;
bool success = false;
TimestampTz startTime = 0;
TupleTableSlot *reslot = m_local.m_reslot;
Datum *values = m_local.m_values;
bool *isnull = m_local.m_isnull;
for (int i = 0; i < m_global->m_tupDesc->natts; i++) {
isnull[i] = true;
}
UpdateUniqueSQLSortStats(NULL, &startTime);
m_local.m_scan->refreshParameter(m_local.m_outParams == NULL ? m_local.m_params : m_local.m_outParams);
m_local.m_scan->Init(max_rows);
setReceiver();
Tuplesortstate *tuplesortstate = NULL;
Sort *sortnode = (Sort *)m_global->m_planstmt->planTree;
int64 sortMem = SET_NODEMEM(sortnode->plan.operatorMemKB[0], sortnode->plan.dop);
int64 maxMem = (sortnode->plan.operatorMaxMem > 0) ?
SET_NODEMEM(sortnode->plan.operatorMaxMem, sortnode->plan.dop) : 0;
{
AutoContextSwitch memSwitch(m_local.m_tmpContext);
tuplesortstate = tuplesort_begin_heap(m_c_local.m_scanDesc,
sortnode->numCols,
sortnode->sortColIdx,
sortnode->sortOperators,
sortnode->collations,
sortnode->nullsFirst,
sortMem,
false,
maxMem,
sortnode->plan.plan_node_id,
SET_DOP(sortnode->plan.dop));
}
long nprocessed = 0;
TupleTableSlot *slot = NULL;
while ((slot = m_local.m_scan->getTupleSlot()) != NULL) {
tuplesort_puttupleslot(tuplesortstate, slot);
}
tuplesort_performsort(tuplesortstate);
UpdateUniqueSQLSortStats(tuplesortstate, &startTime);
slot = MakeSingleTupleTableSlot(m_c_local.m_scanDesc);
while (tuplesort_gettupleslot(tuplesortstate, true, slot, NULL)) {
tableam_tslot_getsomeattrs(slot, m_c_local.m_scanDesc->natts);
for (int i = 0; i < m_global->m_tupDesc->natts; i++) {
values[i] = slot->tts_values[m_global->m_attrno[i] - 1];
isnull[i] = slot->tts_isnull[m_global->m_attrno[i] - 1];
}
HeapTuple tmptup = (HeapTuple)tableam_tops_form_tuple(m_global->m_tupDesc, values, isnull,
m_global->m_tupDesc->td_tam_ops);
(void)ExecStoreTuple(tmptup, reslot, InvalidBuffer, false);
(*m_local.m_receiver->receiveSlot)(reslot, m_local.m_receiver);
tableam_tops_free_tuple(tmptup);
CHECK_FOR_INTERRUPTS();
nprocessed++;
if (nprocessed >= max_rows) {
break;
}
}
success = true;
if (m_local.m_isInsideRec == true) {
(*m_local.m_receiver->rDestroy)(m_local.m_receiver);
}
m_local.m_isCompleted = true;
m_local.m_scan->End(true);
tuplesort_end(tuplesortstate);
errno_t errorno = snprintf_s(completionTag, COMPLETION_TAG_BUFSIZE, COMPLETION_TAG_BUFSIZE - 1,
"SELECT %lu", nprocessed);
securec_check_ss(errorno, "\0", "\0");
return success;
}