* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* opfusion_insert.cpp
* Definition of insert operator for bypass executor.
*
* IDENTIFICATION
* src/gausskernel/runtime/opfusion/opfusion_insert.cpp
*
* ---------------------------------------------------------------------------------------
*/
#include "opfusion/opfusion_insert.h"
#include "access/tableam.h"
#ifdef ENABLE_HTAP
#include "access/htap/imcstore_delta.h"
#endif
#include "catalog/pg_partition_fn.h"
#include "catalog/storage_gtt.h"
#include "commands/matview.h"
#include "commands/sequence.h"
#include "executor/node/nodeModifyTable.h"
#include "executor/node/nodeSeqscan.h"
#include "parser/parse_coerce.h"
#include "utils/partitionmap.h"
extern void ExecEndPlan(PlanState* planstate, EState* estate);
inline bool check_attisnull_exist(ResultRelInfo* ResultRelInfo, TupleTableSlot* slot);
static List* ExecInsertIndexTuplesOpfusion(TupleTableSlot* slot, ItemPointer tupleid, EState* estate,
Relation targetPartRel, Partition p, int2 bucketId, bool* conflict,
Bitmapset* modifiedIdxAttrs);
static TupleTableSlot* insert_real(ModifyTableState* state, TupleTableSlot* slot, EState* estate, bool canSetTag);
void InsertFusion::InitBaseParam(List* targetList) {
ListCell* lc = NULL;
int i = 0;
FuncExpr* func = NULL;
TargetEntry* res = NULL;
Expr* expr = NULL;
OpExpr* opexpr = NULL;
foreach (lc, targetList) {
res = (TargetEntry*)lfirst(lc);
expr = res->expr;
Assert(
IsA(expr, Const) || IsA(expr, Param) || IsA(expr, FuncExpr) || IsA(expr, RelabelType) || IsA(expr, OpExpr));
while (IsA(expr, RelabelType)) {
expr = ((RelabelType*)expr)->arg;
}
m_c_global->m_targetConstLoc[i].constLoc = -1;
if (IsA(expr, FuncExpr)) {
func = (FuncExpr*)expr;
m_c_global->m_targetFuncNodes[m_c_global->m_targetFuncNum].resno = res->resno;
m_c_global->m_targetFuncNodes[m_c_global->m_targetFuncNum].resname = res->resname;
m_c_global->m_targetFuncNodes[m_c_global->m_targetFuncNum].funcid = func->funcid;
m_c_global->m_targetFuncNodes[m_c_global->m_targetFuncNum].args = func->args;
++m_c_global->m_targetFuncNum;
} else if (IsA(expr, Param)) {
Param* param = (Param*)expr;
m_global->m_paramLoc[m_c_global->m_targetParamNum].paramId = param->paramid;
m_global->m_paramLoc[m_c_global->m_targetParamNum++].scanKeyIndx = i;
} else if (IsA(expr, Const)) {
Assert(IsA(expr, Const));
m_c_global->m_targetConstLoc[i].constValue = ((Const*)expr)->constvalue;
m_c_global->m_targetConstLoc[i].constIsNull = ((Const*)expr)->constisnull;
m_c_global->m_targetConstLoc[i].constLoc = i;
} else if (IsA(expr, OpExpr)) {
opexpr = (OpExpr*)expr;
m_c_global->m_targetFuncNodes[m_c_global->m_targetFuncNum].resno = res->resno;
m_c_global->m_targetFuncNodes[m_c_global->m_targetFuncNum].resname = res->resname;
m_c_global->m_targetFuncNodes[m_c_global->m_targetFuncNum].funcid = opexpr->opfuncid;
m_c_global->m_targetFuncNodes[m_c_global->m_targetFuncNum].args = opexpr->args;
++m_c_global->m_targetFuncNum;
}
i++;
}
m_c_global->m_targetConstNum = i;
}
void InsertFusion::InitGlobals()
{
m_c_global = (InsertFusionGlobalVariable*)palloc0(sizeof(InsertFusionGlobalVariable));
m_global->m_reloid = getrelid(linitial_int((List*)linitial(m_global->m_planstmt->resultRelations)),
m_global->m_planstmt->rtable);
ModifyTable* node = (ModifyTable*)m_global->m_planstmt->planTree;
BaseResult* baseresult = (BaseResult*)linitial(node->plans);
List* targetList = baseresult->plan.targetlist;
Relation rel = heap_open(m_global->m_reloid, AccessShareLock);
m_global->m_table_type = RelationIsUstoreFormat(rel) ? TAM_USTORE : TAM_HEAP;
m_global->m_exec_func_ptr = (OpFusionExecfuncType)&InsertFusion::ExecInsert;
m_global->m_natts = RelationGetDescr(rel)->natts;
m_global->m_is_bucket_rel = RELATION_OWN_BUCKET(rel);
m_global->m_tupDesc = CreateTupleDescCopy(RelationGetDescr(rel));
m_global->m_tupDesc->td_tam_ops = GetTableAmRoutine(m_global->m_table_type);
heap_close(rel, AccessShareLock);
m_global->m_paramNum = 0;
m_global->m_paramLoc = (ParamLoc*)palloc0(m_global->m_natts * sizeof(ParamLoc));
m_c_global->m_targetParamNum = 0;
m_c_global->m_targetFuncNum = 0;
m_c_global->m_targetFuncNodes = (FuncExprInfo*)palloc0(m_global->m_natts * sizeof(FuncExprInfo));
m_c_global->m_targetConstNum = 0;
m_c_global->m_targetConstLoc = (ConstLoc*)palloc0(m_global->m_natts * sizeof(ConstLoc));
InitBaseParam(targetList);
}
void InsertFusion::InitLocals(ParamListInfo params)
{
m_c_local.m_estate = CreateExecutorStateForOpfusion(m_local.m_localContext, m_local.m_tmpContext);
m_c_local.m_estate->es_range_table = m_global->m_planstmt->rtable;
m_c_local.m_estate->es_plannedstmt = m_global->m_planstmt;
m_local.m_reslot = MakeSingleTupleTableSlot(m_global->m_tupDesc);
if (m_global->m_table_type == TAM_USTORE) {
m_local.m_reslot->tts_tam_ops = TableAmUstore;
}
m_local.m_values = (Datum*)palloc0(m_global->m_natts * sizeof(Datum));
m_local.m_isnull = (bool*)palloc0(m_global->m_natts * sizeof(bool));
m_c_local.m_curVarValue = (Datum*)palloc0(m_global->m_natts * sizeof(Datum));
m_c_local.m_curVarIsnull = (bool*)palloc0(m_global->m_natts * sizeof(bool));
initParams(params);
m_local.m_receiver = NULL;
m_local.m_isInsideRec = true;
m_local.m_optype = INSERT_FUSION;
}
InsertFusion::InsertFusion(MemoryContext context, CachedPlanSource* psrc, List* plantree_list, ParamListInfo params)
: OpFusion(context, psrc, plantree_list)
{
MemoryContext old_context = NULL;
if (!IsGlobal()) {
old_context = MemoryContextSwitchTo(m_global->m_context);
InitGlobals();
MemoryContextSwitchTo(old_context);
} else {
m_c_global = ((InsertFusion*)(psrc->opFusionObj))->m_c_global;
}
old_context = MemoryContextSwitchTo(m_local.m_localContext);
InitLocals(params);
MemoryContextSwitchTo(old_context);
}
void InsertFusion::refreshParameterIfNecessary()
{
ParamListInfo parms = m_local.m_outParams != NULL ? m_local.m_outParams : m_local.m_params;
bool func_isnull = false;
for (int i = 0; i < m_global->m_tupDesc->natts; i++) {
m_c_local.m_curVarValue[i] = m_local.m_values[i];
m_c_local.m_curVarIsnull[i] = m_local.m_isnull[i];
}
for (int i = 0; i < m_c_global->m_targetConstNum; i++) {
if (m_c_global->m_targetConstLoc[i].constLoc >= 0) {
m_local.m_values[m_c_global->m_targetConstLoc[i].constLoc] = m_c_global->m_targetConstLoc[i].constValue;
m_local.m_isnull[m_c_global->m_targetConstLoc[i].constLoc] = m_c_global->m_targetConstLoc[i].constIsNull;
}
}
for (int i = 0; i < m_c_global->m_targetFuncNum; ++i) {
ELOG_FIELD_NAME_START(m_c_global->m_targetFuncNodes[i].resname);
if (m_c_global->m_targetFuncNodes[i].funcid != InvalidOid) {
func_isnull = false;
m_local.m_values[m_c_global->m_targetFuncNodes[i].resno - 1] =
CalFuncNodeVal(m_c_global->m_targetFuncNodes[i].funcid,
m_c_global->m_targetFuncNodes[i].args,
&func_isnull,
m_c_local.m_curVarValue,
m_c_local.m_curVarIsnull);
m_local.m_isnull[m_c_global->m_targetFuncNodes[i].resno - 1] = func_isnull;
}
ELOG_FIELD_NAME_END;
}
if (m_c_global->m_targetParamNum > 0) {
for (int i = 0; i < m_c_global->m_targetParamNum; i++) {
m_local.m_values[m_global->m_paramLoc[i].scanKeyIndx] =
parms->params[m_global->m_paramLoc[i].paramId - 1].value;
m_local.m_isnull[m_global->m_paramLoc[i].scanKeyIndx] =
parms->params[m_global->m_paramLoc[i].paramId - 1].isnull;
}
}
}
extern HeapTuple searchPgPartitionByParentIdCopy(char parttype, Oid parentId);
PartKeyExprResult ComputePartKeyExprTuple(Relation rel, EState *estate, TupleTableSlot *slot, Relation partRel, char* partExprKeyStr)
{
bool isnull = false;
Datum newval = 0;
Node* partkeyexpr = NULL;
Relation tmpRel = NULL;
ExprState* exprstate = NULL;
bool needFreeEstate = false;
if (partExprKeyStr && pg_strcasecmp(partExprKeyStr, "") != 0) {
partkeyexpr = (Node*)stringToNode_skip_extern_fields(partExprKeyStr);
} else {
ereport(ERROR, (errcode(ERRCODE_PARTITION_ERROR), errmsg("The partition expr key can't be null for table %s", NameStr(rel->rd_rel->relname))));
}
(void)lockNextvalWalker(partkeyexpr, NULL);
if (estate == NULL) {
needFreeEstate = true;
estate = CreateExecutorState();
exprstate = ExecInitExpr(expression_planner((Expr*)partkeyexpr), NULL);
} else {
exprstate = ExecPrepareExpr((Expr *)partkeyexpr, estate);
}
ExprContext *econtext;
econtext = GetPerTupleExprContext(estate);
econtext->ecxt_scantuple = slot;
isnull = false;
newval = ExecEvalExpr(exprstate, econtext, &isnull, NULL);
Const** boundary = NULL;
if (PointerIsValid(partRel))
tmpRel = partRel;
else
tmpRel = rel;
if (tmpRel->partMap->type == PART_TYPE_RANGE)
boundary = ((RangePartitionMap*)(tmpRel->partMap))->rangeElements[0].boundary;
else if (tmpRel->partMap->type == PART_TYPE_LIST)
boundary = ((ListPartitionMap*)(tmpRel->partMap))->listElements[0].boundary[0].values;
else if (tmpRel->partMap->type == PART_TYPE_HASH)
boundary = ((HashPartitionMap*)(tmpRel->partMap))->hashElements[0].boundary;
else
ereport(ERROR, (errcode(ERRCODE_PARTITION_ERROR), errmsg("Unsupported partition type : %d", tmpRel->partMap->type)));
if (!isnull)
newval = datumCopy(newval, boundary[0]->constbyval, boundary[0]->constlen);
if (needFreeEstate) {
FreeExecutorState(estate);
}
return {newval, isnull};
}
static void ExecReleaseResource(Tuple tuple, TupleTableSlot *slot, ResultRelInfo *result_rel_info, EState *estate,
Relation bucket_rel, Relation rel, Partition part, Relation partRel)
{
tableam_tops_free_tuple(tuple);
(void)ExecClearTuple(slot);
ExecCloseIndices(result_rel_info);
ExecDoneStepInFusion(estate);
if (bucket_rel != NULL) {
bucketCloseRelation(bucket_rel);
}
if (RELATION_IS_PARTITIONED(rel) && part != NULL) {
partitionClose(rel, part, RowExclusiveLock);
releaseDummyRelation(&partRel);
}
}
unsigned long InsertFusion::ExecInsert(Relation rel, ResultRelInfo* result_rel_info)
{
* step 1: prepare *
*******************/
Relation bucket_rel = NULL;
int2 bucketid = InvalidBktId;
Oid partOid = InvalidOid;
Partition part = NULL;
Relation partRel = NULL;
bool rel_isblockchain = rel->rd_isblockchain;
CommandId mycid = GetCurrentCommandId(true);
init_gtt_storage(CMD_INSERT, result_rel_info);
* step 2: begin insert *
************************/
Tuple tuple = tableam_tops_form_tuple(m_global->m_tupDesc, m_local.m_values,
m_local.m_isnull, rel->rd_tam_ops);
Assert(tuple != NULL);
if (RELATION_IS_PARTITIONED(rel)) {
m_c_local.m_estate->esfRelations = NULL;
int partitionno = INVALID_PARTITION_NO;
m_local.m_reslot->tts_tuple = tuple;
partOid = getPartitionIdFromTuple(rel, tuple, m_c_local.m_estate, m_local.m_reslot, &partitionno, false, m_c_local.m_estate->es_plannedstmt->hasIgnore);
if (m_c_local.m_estate->es_plannedstmt->hasIgnore && partOid == InvalidOid) {
ExecReleaseResource(tuple, m_local.m_reslot, result_rel_info, m_c_local.m_estate, bucket_rel, rel, part,
partRel);
return 0;
}
part = PartitionOpenWithPartitionno(rel, partOid, partitionno, RowExclusiveLock);
partRel = partitionGetRelation(rel, part);
}
if (m_global->m_is_bucket_rel) {
bucketid = computeTupleBucketId(result_rel_info->ri_RelationDesc, (HeapTuple)tuple);
bucket_rel = InitBucketRelation(bucketid, rel, part);
}
(void)ExecStoreTuple(tuple, m_local.m_reslot, InvalidBuffer, false);
* Compute stored generated columns
*/
if (result_rel_info->ri_RelationDesc->rd_att->constr &&
result_rel_info->ri_RelationDesc->rd_att->constr->has_generated_stored) {
ExecComputeStoredGenerated(result_rel_info, m_c_local.m_estate, m_local.m_reslot, tuple, CMD_INSERT);
if (tuple != m_local.m_reslot->tts_tuple) {
tableam_tops_free_tuple(tuple);
tuple = m_local.m_reslot->tts_tuple;
}
}
if (rel->rd_att->constr) {
* If values violate constraints, directly return.
*/
if(!ExecConstraints(result_rel_info, m_local.m_reslot, m_c_local.m_estate, true)) {
if (u_sess->utils_cxt.sql_ignore_strategy_val != SQL_OVERWRITE_NULL) {
ExecReleaseResource(tuple, m_local.m_reslot, result_rel_info, m_c_local.m_estate, bucket_rel, rel, part,
partRel);
return 0;
}
bool can_ignore = m_c_local.m_estate->es_plannedstmt && m_c_local.m_estate->es_plannedstmt->hasIgnore;
tuple =
ReplaceTupleNullCol(RelationGetDescr(result_rel_info->ri_RelationDesc), m_local.m_reslot, can_ignore);
* violated check constraints */
ExecConstraints(result_rel_info, m_local.m_reslot, m_c_local.m_estate, true);
}
tuple = ExecAutoIncrement(rel, m_c_local.m_estate, m_local.m_reslot, tuple);
if (tuple != m_local.m_reslot->tts_tuple) {
tableam_tops_free_tuple(tuple);
tuple = m_local.m_reslot->tts_tuple;
}
}
if (!IGNORE_UNUSED_INDEX_CHECK_ON_DML) {
CheckIndexDisableValid(result_rel_info, m_c_local.m_estate);
}
Relation destRel = RELATION_IS_PARTITIONED(rel) ? partRel : rel;
Relation target_rel = (bucket_rel == NULL) ? destRel : bucket_rel;
if (rel_isblockchain && (!RelationIsUstoreFormat(rel))) {
HeapTuple tmp_tuple = (HeapTuple)tuple;
MemoryContext old_context = MemoryContextSwitchTo(m_local.m_tmpContext);
tuple = set_user_tuple_hash(tmp_tuple, target_rel, NULL);
(void)ExecStoreTuple(tuple, m_local.m_reslot, InvalidBuffer, false);
m_local.m_ledger_hash_exist = hist_table_record_insert(target_rel, (HeapTuple)tuple, &m_local.m_ledger_relhash);
(void)MemoryContextSwitchTo(old_context);
tableam_tops_free_tuple(tmp_tuple);
}
bool isgpi = false;
ConflictInfoData conflictInfo;
Oid conflictPartOid = InvalidOid;
int2 conflictBucketid = InvalidBktId;
if (m_c_local.m_estate->es_plannedstmt && m_c_local.m_estate->es_plannedstmt->hasIgnore &&
!ExecCheckIndexConstraints(m_local.m_reslot, m_c_local.m_estate, target_rel, part, &isgpi, bucketid,
&conflictInfo, &conflictPartOid, &conflictBucketid)) {
ereport(WARNING, (errmsg("duplicate key value violates unique constraint in table \"%s\"",
RelationGetRelationName(target_rel))));
ExecReleaseResource(tuple, m_local.m_reslot, result_rel_info, m_c_local.m_estate, bucket_rel, rel, part,
partRel);
return 0;
}
(void)tableam_tuple_insert(bucket_rel == NULL ? destRel : bucket_rel, tuple, mycid, 0, NULL);
if (!RELATION_IS_PARTITIONED(rel)) {
if (rel != NULL && rel->rd_mlogoid != InvalidOid) {
HeapTuple htuple = NULL;
if (rel->rd_tam_ops == TableAmUstore) {
htuple = UHeapToHeap(rel->rd_att, (UHeapTuple)tuple);
insert_into_mlog_table(rel, rel->rd_mlogoid, htuple, &htuple->t_self,
GetCurrentTransactionId(), 'I');
} else {
insert_into_mlog_table(rel, rel->rd_mlogoid, (HeapTuple)tuple, &((HeapTuple)tuple)->t_self,
GetCurrentTransactionId(), 'I');
}
}
}
List* recheck_indexes = NIL;
if (result_rel_info->ri_NumIndices > 0) {
recheck_indexes = ExecInsertIndexTuples(m_local.m_reslot,
&(((HeapTuple)tuple)->t_self),
m_c_local.m_estate,
RELATION_IS_PARTITIONED(rel) ? partRel : NULL,
RELATION_IS_PARTITIONED(rel) ? part : NULL,
bucketid, NULL, NULL);
}
list_free_ext(recheck_indexes);
if (result_rel_info->ri_WithCheckOptions != NIL)
ExecWithCheckOptions(result_rel_info, m_local.m_reslot, m_c_local.m_estate);
#ifdef ENABLE_HTAP
if (HAVE_HTAP_TABLES) {
IMCStoreInsertHook(RelationGetRelid(rel), tableam_tops_get_t_self(rel, tuple));
}
#endif
* step 3: done *
****************/
ExecReleaseResource(tuple, m_local.m_reslot, result_rel_info, m_c_local.m_estate, bucket_rel, rel, part, partRel);
return 1;
}
bool InsertFusion::execute(long max_rows, char* completionTag)
{
bool success = false;
errno_t errorno = EOK;
* step 1: prepare *
*******************/
Relation rel = heap_open(m_global->m_reloid, RowExclusiveLock);
ResultRelInfo* result_rel_info = makeNode(ResultRelInfo);
InitResultRelInfo(result_rel_info, rel, 1, 0);
if (result_rel_info->ri_RelationDesc->rd_rel->relhasindex) {
bool speculative = (m_c_local.m_estate->es_plannedstmt && m_c_local.m_estate->es_plannedstmt->hasIgnore) ||
(m_global->m_planstmt && m_global->m_planstmt->hasIgnore);
ExecOpenIndices(result_rel_info, speculative);
}
init_gtt_storage(CMD_INSERT, result_rel_info);
m_c_local.m_estate->es_result_relation_info = result_rel_info;
m_c_local.m_estate->es_plannedstmt = m_global->m_planstmt;
refreshParameterIfNecessary();
ModifyTable* node = (ModifyTable*)(m_global->m_planstmt->planTree);
PlanState* ps = NULL;
if (node->withCheckOptionLists != NIL) {
Plan* plan = (Plan*)linitial(node->plans);
ps = ExecInitNode(plan, m_c_local.m_estate, 0);
List* wcoList = (List*)linitial(node->withCheckOptionLists);
List* wcoExprs = NIL;
ListCell* ll = NULL;
foreach(ll, wcoList) {
WithCheckOption* wco = (WithCheckOption*)lfirst(ll);
ExprState* wcoExpr = NULL;
if (ps->state->es_is_flt_frame) {
wcoExpr = ExecInitQualByFlatten((List*)wco->qual, ps);
} else {
wcoExpr = ExecInitExprByRecursion((Expr*)wco->qual, ps);
}
wcoExprs = lappend(wcoExprs, wcoExpr);
}
result_rel_info->ri_WithCheckOptions = wcoList;
result_rel_info->ri_WithCheckOptionExprs = wcoExprs;
}
* step 2: begin insert *
************************/
unsigned long nprocessed = (this->*(m_global->m_exec_func_ptr))(rel, result_rel_info);
heap_close(rel, NoLock);
* step 3: done *
****************/
if (ps != NULL) {
ExecEndNode(ps);
}
success = true;
m_local.m_isCompleted = true;
if (m_local.m_ledger_hash_exist && !IsConnFromApp()) {
errorno = snprintf_s(completionTag, COMPLETION_TAG_BUFSIZE, COMPLETION_TAG_BUFSIZE - 1,
"INSERT 0 %ld %lu\0", nprocessed, m_local.m_ledger_relhash);
} else {
errorno =
snprintf_s(completionTag, COMPLETION_TAG_BUFSIZE, COMPLETION_TAG_BUFSIZE - 1, "INSERT 0 %ld", nprocessed);
}
securec_check_ss(errorno, "\0", "\0");
FreeExecutorStateForOpfusion(m_c_local.m_estate);
u_sess->statement_cxt.current_row_count = nprocessed;
u_sess->statement_cxt.last_row_count = u_sess->statement_cxt.current_row_count;
if (u_sess->hook_cxt.rowcountHook) {
((RowcountHook)(u_sess->hook_cxt.rowcountHook))(nprocessed);
}
return success;
}
* reset InsertFusion for reuse:
* such as
* insert into t values (1, 'a');
* insert into t values (2, 'b');
* only need to replace the planstmt
*/
bool InsertFusion::ResetReuseFusion(MemoryContext context, CachedPlanSource* psrc, List* plantree_list, ParamListInfo params)
{
PlannedStmt *curr_plan = (PlannedStmt *)linitial(plantree_list);
m_global->m_planstmt = curr_plan;
ModifyTable* node = (ModifyTable*)m_global->m_planstmt->planTree;
BaseResult* baseresult = (BaseResult*)linitial(node->plans);
List* targetList = baseresult->plan.targetlist;
m_c_global->m_targetFuncNum = 0;
m_c_global->m_targetParamNum = 0;
InitBaseParam(targetList);
m_local.m_reslot->tts_tam_ops = GetTableAmRoutine(m_global->m_table_type);
m_c_local.m_estate->es_range_table = NIL;
m_c_local.m_estate->es_range_table = m_global->m_planstmt->rtable;
m_c_local.m_estate->es_plannedstmt = m_global->m_planstmt;
initParams(params);
return true;
}
void InsertSubFusion::InitGlobals()
{
m_c_global = (InsertSubFusionGlobalVariable*)palloc0(sizeof(InsertSubFusionGlobalVariable));
m_global->m_reloid = getrelid(linitial_int((List*)linitial(m_global->m_planstmt->resultRelations)),
m_global->m_planstmt->rtable);
ModifyTable* node = (ModifyTable*)m_global->m_planstmt->planTree;
SeqScan* seqscan = (SeqScan*)linitial(node->plans);
List* targetList = seqscan->plan.targetlist;
m_c_local.m_ss_plan = (SeqScan*)copyObject(seqscan);
m_c_local.m_plan = (Plan*)copyObject(node);
Relation rel = heap_open(m_global->m_reloid, AccessShareLock);
m_global->m_table_type = RelationIsUstoreFormat(rel) ? TAM_USTORE : TAM_HEAP;
m_global->m_exec_func_ptr = (OpFusionExecfuncType)&InsertSubFusion::ExecInsert;
m_global->m_natts = RelationGetDescr(rel)->natts;
m_global->m_is_bucket_rel = RELATION_OWN_BUCKET(rel);
m_global->m_tupDesc = CreateTupleDescCopy(RelationGetDescr(rel));
m_global->m_tupDesc->td_tam_ops = GetTableAmRoutine(m_global->m_table_type);
heap_close(rel, AccessShareLock);
m_c_global->m_targetVarLoc = (VarLoc*)palloc0(m_global->m_natts * sizeof(VarLoc));
m_c_global->m_varNum = 0;
ListCell* lc = NULL;
int i = 0;
TargetEntry* res = NULL;
Expr* expr = NULL;
foreach (lc, targetList) {
res = (TargetEntry*)lfirst(lc);
expr = res->expr;
Assert(
IsA(expr, Var) || IsA(expr, RelabelType)
);
while (IsA(expr, RelabelType)) {
expr = ((RelabelType*)expr)->arg;
}
if (IsA(expr, Var)) {
Var* var = (Var*)expr;
m_c_global->m_targetVarLoc[m_c_global->m_varNum].varNo = var->varattno;
m_c_global->m_targetVarLoc[m_c_global->m_varNum++].scanKeyIndx = i;
}
i++;
}
m_c_global->m_targetConstNum = i;
}
void InsertSubFusion::InitLocals(ParamListInfo params)
{
m_c_local.m_estate = CreateExecutorStateForOpfusion(m_local.m_localContext, m_local.m_tmpContext);
m_c_local.m_estate->es_range_table = m_global->m_planstmt->rtable;
m_c_local.m_estate->es_plannedstmt = m_global->m_planstmt;
m_local.m_reslot = MakeSingleTupleTableSlot(m_global->m_tupDesc);
if (m_global->m_table_type == TAM_USTORE) {
m_local.m_reslot->tts_tam_ops = TableAmUstore;
}
m_local.m_values = (Datum*)palloc0(m_global->m_natts * sizeof(Datum));
m_local.m_isnull = (bool*)palloc0(m_global->m_natts * sizeof(bool));
m_c_local.m_curVarValue = (Datum*)palloc0(m_global->m_natts * sizeof(Datum));
m_c_local.m_curVarIsnull = (bool*)palloc0(m_global->m_natts * sizeof(bool));
initParams(params);
m_local.m_receiver = NULL;
m_local.m_isInsideRec = true;
m_local.m_optype = INSERT_SUB_FUSION;
}
InsertSubFusion::InsertSubFusion(MemoryContext context, CachedPlanSource* psrc, List* plantree_list, ParamListInfo params)
: OpFusion(context, psrc, plantree_list)
{
MemoryContext old_context = NULL;
if (!IsGlobal()) {
old_context = MemoryContextSwitchTo(m_global->m_context);
InitGlobals();
MemoryContextSwitchTo(old_context);
} else {
m_c_global = ((InsertSubFusion*)(psrc->opFusionObj))->m_c_global;
}
old_context = MemoryContextSwitchTo(m_local.m_localContext);
InitLocals(params);
MemoryContextSwitchTo(old_context);
}
* ExecInsertIndexTuplesOpfusion
*
* This routine takes care of inserting index tuples
* into all the relations indexing the result relation
* when a heap tuple is inserted into the result relation.
* It is similar to ExecInsertIndexTuples.
*/
static List* ExecInsertIndexTuplesOpfusion(TupleTableSlot* slot, ItemPointer tupleid, EState* estate,
Relation targetPartRel, Partition p, int2 bucketId, bool* conflict,
Bitmapset* modifiedIdxAttrs)
{
List* result = NIL;
ResultRelInfo* resultRelInfo = NULL;
int i;
int numIndices;
RelationPtr relationDescs;
IndexInfo** indexInfoArray;
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
Relation actualheap;
bool containGPI;
List* partitionIndexOidList = NIL;
* Get information from the result relation info structure.
*/
resultRelInfo = estate->es_result_relation_info;
numIndices = resultRelInfo->ri_NumIndices;
relationDescs = resultRelInfo->ri_IndexRelationDescs;
indexInfoArray = resultRelInfo->ri_IndexRelationInfo;
actualheap = resultRelInfo->ri_RelationDesc;
containGPI = resultRelInfo->ri_ContainGPI;
* for each index, form and insert the index tuple
*/
for (i = 0; i < numIndices; i++) {
Relation indexRelation = relationDescs[i];
IndexInfo* indexInfo = NULL;
IndexUniqueCheck checkUnique;
bool satisfiesConstraint = false;
Relation actualindex = NULL;
if (indexRelation == NULL) {
continue;
}
indexInfo = indexInfoArray[i];
if (!indexInfo->ii_ReadyForInserts) {
continue;
}
actualindex = indexRelation;
* FormIndexDatum fills in its values and isnull parameters with the
* appropriate values for the column(s) of the index.
*/
FormIndexDatum(indexInfo, slot, estate, values, isnull);
* The index AM does the actual insertion, plus uniqueness checking.
*
* For an immediate-mode unique index, we just tell the index AM to
* throw error if not unique.
*
* For a deferrable unique index, we tell the index AM to just detect
* possible non-uniqueness, and we add the index OID to the result
* list if further checking is needed.
*/
if (!indexRelation->rd_index->indisunique) {
checkUnique = UNIQUE_CHECK_NO;
} else if (conflict != NULL) {
checkUnique = UNIQUE_CHECK_PARTIAL;
} else if (indexRelation->rd_index->indimmediate) {
checkUnique = UNIQUE_CHECK_YES;
} else {
checkUnique = UNIQUE_CHECK_PARTIAL;
}
satisfiesConstraint = index_insert(actualindex,
values,
isnull,
tupleid,
actualheap,
checkUnique);
* If the index has an associated exclusion constraint, check that.
* This is simpler than the process for uniqueness checks since we
* always insert first and then check. If the constraint is deferred,
* we check now anyway, but don't throw error on violation; instead
* we'll queue a recheck event.
*
* An index for an exclusion constraint can't also be UNIQUE (not an
* essential property, we just don't allow it in the grammar), so no
* need to preserve the prior state of satisfiesConstraint.
*/
if (indexInfo->ii_ExclusionOps != NULL) {
bool errorOK = !actualindex->rd_index->indimmediate;
satisfiesConstraint = check_exclusion_constraint(
actualheap, actualindex, indexInfo, tupleid, values, isnull, estate, false, errorOK);
}
if ((checkUnique == UNIQUE_CHECK_PARTIAL || indexInfo->ii_ExclusionOps != NULL) && !satisfiesConstraint) {
* The tuple potentially violates the uniqueness or exclusion
* constraint, so make a note of the index so that we can re-check
* it later. Speculative inserters are told if there was a
* speculative conflict, since that always requires a restart.
*/
result = lappend_oid(result, RelationGetRelid(indexRelation));
if (conflict != NULL) {
*conflict = true;
}
}
}
list_free_ext(partitionIndexOidList);
return result;
}
* check attribute isnull constrain
*
* if we find a isnull attribute, return false, otherwise return true.
*/
bool check_attisnull_exist(ResultRelInfo* ResultRelInfo, TupleTableSlot* slot)
{
Relation rel = ResultRelInfo->ri_RelationDesc;
TupleDesc tupdesc = RelationGetDescr(rel);
TupleConstr* constr = tupdesc->constr;
if (constr->has_not_null) {
int natts = tupdesc->natts;
int attrChk;
for (attrChk = 1; attrChk <= natts; attrChk++) {
Form_pg_attribute att = TupleDescAttr(tupdesc, attrChk -1);
if (att->attnotnull && tableam_tslot_attisnull(slot, attrChk)) {
return false;
}
}
}
if (constr->num_check == 0) {
return true;
}
return false;
}
* insert_real
*
* execute insert a slot to target relation
* state: ModifyTableState state of current plan
* slot: slot be inserted
* estate: execute state
* canSetTag: do we set the command tag/es_processed?
*/
TupleTableSlot* insert_real(ModifyTableState* state, TupleTableSlot* slot, EState* estate, bool canSetTag)
{
Tuple tuple = NULL;
ResultRelInfo* result_rel_info = NULL;
Relation result_relation_desc;
Oid new_id = InvalidOid;
List* recheck_indexes = NIL;
Partition partition = NULL;
Relation target_rel = NULL;
ItemPointer pTSelf = NULL;
bool rel_isblockchain = false;
int2 bucket_id = InvalidBktId;
#ifdef ENABLE_MULTIPLE_NDOES
RemoteQueryState* result_remote_rel = NULL;
#endif
* get information on the (current) result relation
*/
result_rel_info = estate->es_result_relation_info;
result_relation_desc = result_rel_info->ri_RelationDesc;
rel_isblockchain = result_relation_desc->rd_isblockchain;
* get the heap tuple out of the tuple table slot, making sure we have a
* writable copy
*/
tuple = tableam_tslot_get_tuple_from_slot(result_rel_info->ri_RelationDesc, slot);
#ifdef ENABLE_MULTIPLE_NDOES
result_remote_rel = (RemoteQueryState*)estate->es_result_remoterel;
#endif
* If the result relation has OIDs, force the tuple's OID to zero so that
* heap_insert will assign a fresh OID. Usually the OID already will be
* zero at this point, but there are corner cases where the plan tree can
* return a tuple extracted literally from some table with the same
* rowtype.
*
* XXX if we ever wanted to allow users to assign their own OIDs to new
* rows, this'd be the place to do it. For the moment, we make a point of
* doing this before calling triggers, so that a user-supplied trigger
* could hack the OID if desired.
*/
* Check the constraints of the tuple
*/
if (result_relation_desc->rd_att->constr) {
TupleTableSlot *tmp_slot = state->mt_insert_constr_slot == NULL ? slot : state->mt_insert_constr_slot;
if (likely(check_attisnull_exist(result_rel_info, tmp_slot))) {
} else if (!ExecConstraints(result_rel_info, tmp_slot, estate, true)) {
if (u_sess->utils_cxt.sql_ignore_strategy_val == SQL_OVERWRITE_NULL) {
bool canIgnore = estate->es_plannedstmt && estate->es_plannedstmt->hasIgnore;
tuple = ReplaceTupleNullCol(RelationGetDescr(result_relation_desc), tmp_slot, canIgnore);
* Double check constraints in case that new val in column with not null constraints
* violated check constraints
*/
ExecConstraints(result_rel_info, tmp_slot, estate, true);
} else {
return NULL;
}
}
}
* insert the tuple
* Note: heap_insert returns the tid (location) of the new tuple in
* the t_self field.
*/
new_id = InvalidOid;
bool isgpi = false;
ConflictInfoData conflictInfo;
Oid conflictPartOid = InvalidOid;
int2 conflictBucketid = InvalidBktId;
target_rel = result_rel_info->ri_RelationDesc;
if (estate->es_plannedstmt && estate->es_plannedstmt->hasIgnore &&
!ExecCheckIndexConstraints(slot, estate, target_rel, partition, &isgpi,
bucket_id, &conflictInfo, &conflictPartOid,
&conflictBucketid)) {
ereport(WARNING,
(errmsg("duplicate key value violates unique constraint in table \"%s\"",
RelationGetRelationName(target_rel))));
return NULL;
}
new_id = tableam_tuple_insert(target_rel, tuple, estate->es_output_cid, 0, NULL);
if (result_rel_info->ri_NumIndices > 0) {
pTSelf = tableam_tops_get_t_self(target_rel, tuple);
recheck_indexes = ExecInsertIndexTuplesOpfusion(slot, pTSelf, estate, NULL, NULL,
InvalidBktId, NULL, NULL);
list_free_ext(recheck_indexes);
}
if (canSetTag) {
(estate->es_processed)++;
estate->es_lastoid = new_id;
}
return NULL;
}
unsigned long InsertSubFusion::ExecInsert(Relation rel, ResultRelInfo* result_rel_info)
{
* step 1: prepare *
*******************/
Relation bucket_rel = NULL;
Partition part = NULL;
Relation partRel = NULL;
bool is_first_modified = true;
init_gtt_storage(CMD_INSERT, result_rel_info);
ModifyTableState* node = m_c_local.m_mt_state;
EState* estate = m_c_local.m_estate;
estate->es_output_cid = GetCurrentCommandId(true);
int resultRelationNum = node->mt_ResultTupleSlots ? list_length(node->mt_ResultTupleSlots) : 1;
* step 2: begin insert *
************************/
SeqScanState* subPlanState = (SeqScanState*)m_c_local.m_sub_ps;
TupleTableSlot* last_slot = NULL;
Tuple tuple = NULL;
for (;;) {
result_rel_info = node->resultRelInfo + estate->result_rel_index;
estate->es_result_relation_info = result_rel_info;
ResetPerTupleExprContext(estate);
TupleTableSlot* slot = SeqNext(subPlanState);
if (TupIsNull(slot)) {
record_first_time();
break;
}
(void)insert_real(node, slot, estate, node->canSetTag);
last_slot = slot;
record_first_time();
if (estate->result_rel_index == resultRelationNum -1) {
estate->result_rel_index = 0;
} else {
estate->result_rel_index++;
}
}
uint64 nprocessed = estate->es_processed;
* step 3: done *
****************/
ExecReleaseResource(tuple, m_local.m_reslot, result_rel_info, m_c_local.m_estate, bucket_rel, rel, part, partRel);
return nprocessed;
}
void InsertSubFusion::InitPlan()
{
m_c_local.m_ps = ExecInitNode(m_c_local.m_plan, m_c_local.m_estate, 0);
ModifyTableState* node = castNode(ModifyTableState, m_c_local.m_ps);
m_c_local.m_mt_state = node;
}
bool InsertSubFusion::execute(long max_rows, char* completionTag)
{
bool success = false;
errno_t errorno = EOK;
* step 1: prepare *
*******************/
ResultRelInfo* saved_result_rel_info = NULL;
ResultRelInfo* result_rel_info = NULL;
PlanState* subPlanState = NULL;
#ifdef ENABLE_MULTIPLE_NODES
PlanState* remote_rel_state = NULL;
PlanState* insert_remote_rel_state = NULL;
PlanState* update_remote_rel_state = NULL;
PlanState* delete_remote_rel_state = NULL;
PlanState* saved_result_remote_rel = NULL;
#endif
Relation rel = heap_open(m_global->m_reloid, RowExclusiveLock);
result_rel_info = makeNode(ResultRelInfo);
InitResultRelInfo(result_rel_info, rel, 1, 0);
m_c_local.m_estate->es_snapshot = GetActiveSnapshot();
m_c_local.m_estate->es_result_relation_info = result_rel_info;
m_c_local.m_estate->es_result_relations = result_rel_info;
InitPlan();
ModifyTableState* node = m_c_local.m_mt_state;
EState* estate = m_c_local.m_estate;
subPlanState = node->mt_plans[0];
#ifdef ENABLE_MULTIPLE_NODES
remote_rel_state = node->mt_remoterels[0];
insert_remote_rel_state = node->mt_insert_remoterels[0];
update_remote_rel_state = node->mt_update_remoterels[0];
delete_remote_rel_state = node->mt_delete_remoterels[0];
#endif
* es_result_relation_info must point to the currently active result
* relation while we are within this ModifyTable node. Even though
* ModifyTable nodes can't be nested statically, they can be nested
* dynamically (since our subplan could include a reference to a modifying
* CTE). So we have to save and restore the caller's value.
*/
saved_result_rel_info = estate->es_result_relation_info;
#ifdef ENABLE_MULTIPLE_NODES
saved_result_remote_rel = estate->es_result_remoterel;
#endif
#ifdef ENABLE_MULTIPLE_NODES
estate->es_result_remoterel = remote_rel_state;
estate->es_result_insert_remoterel = insert_remote_rel_state;
estate->es_result_update_remoterel = update_remote_rel_state;
estate->es_result_delete_remoterel = delete_remote_rel_state;
#endif
estate->es_plannedstmt = m_global->m_planstmt;
m_c_local.m_sub_ps = castNode(SeqScanState, subPlanState);
* step 2: begin insert *
************************/
unsigned long nprocessed = (this->*(m_global->m_exec_func_ptr))(rel, result_rel_info);
heap_close(rel, RowExclusiveLock);
* step 3: done *
****************/
success = true;
m_local.m_isCompleted = true;
if (m_local.m_ledger_hash_exist && !IsConnFromApp()) {
errorno = snprintf_s(completionTag, COMPLETION_TAG_BUFSIZE, COMPLETION_TAG_BUFSIZE - 1,
"INSERT 0 %ld %lu\0", nprocessed, m_local.m_ledger_relhash);
} else {
errorno =
snprintf_s(completionTag, COMPLETION_TAG_BUFSIZE, COMPLETION_TAG_BUFSIZE - 1, "INSERT 0 %ld", nprocessed);
}
securec_check_ss(errorno, "\0", "\0");
ExecEndPlan(m_c_local.m_ps, m_c_local.m_estate);
FreeExecutorStateForOpfusion(m_c_local.m_estate);
u_sess->statement_cxt.current_row_count = nprocessed;
u_sess->statement_cxt.last_row_count = u_sess->statement_cxt.current_row_count;
return success;
}