* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* execMerge.cpp
* routines to handle Merge nodes relating to the MERGE command
*
* IDENTIFICATION
* src/gausskernel/runtime/executor/execMerge.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/tableam.h"
#include "executor/executor.h"
#include "executor/node/nodeModifyTable.h"
#include "executor/exec/execMerge.h"
#include "miscadmin.h"
#include "nodes/nodeFuncs.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "access/heapam.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
static void ExecMergeNotMatched(ModifyTableState* mtstate, EState* estate, TupleTableSlot* slot, char* partExprKeyStr = NULL);
static bool ExecMergeMatched(ModifyTableState* mtstate, EState* estate, TupleTableSlot* slot, JunkFilter* junkfilter,
ItemPointer tupleid, HeapTupleHeader oldtuple, Oid oldPartitionOid, int2 bucketid, char* partExprKeyStr = NULL);
static LockTupleMode ExecUpdateLockMode(EState *estate, ResultRelInfo *relinfo);
#define GET_ALL_UPDATED_COLUMNS(relinfo, estate) \
(bms_union(exec_rt_fetch((relinfo)->ri_RangeTableIndex, estate)->updatedCols, \
exec_rt_fetch((relinfo)->ri_RangeTableIndex, estate)->extraUpdatedCols))
* ExecUpdateLockMode -- find the appropriate UPDATE tuple lock mode for a
* given ResultRelInfo
*/
static LockTupleMode
ExecUpdateLockMode(EState *estate, ResultRelInfo *relinfo)
{
Bitmapset *keyCols;
Bitmapset *updatedCols;
* Compute lock mode to use. If columns that are part of the key have not
* been modified, then we can use a weaker lock, allowing for better
* concurrency.
*/
updatedCols = GET_ALL_UPDATED_COLUMNS(relinfo, estate);
keyCols = RelationGetIndexAttrBitmap(relinfo->ri_RelationDesc,
INDEX_ATTR_BITMAP_KEY);
if (bms_overlap(keyCols, updatedCols))
return LockTupleExclusive;
return LockTupleNoKeyExclusive;
}
* Perform MERGE.
*/
void ExecMerge(ModifyTableState* mtstate, EState* estate, TupleTableSlot* slot, JunkFilter* junkfilter,
ResultRelInfo* resultRelInfo, char* partExprKeyStr)
{
ExprContext* econtext = mtstate->ps.ps_ExprContext;
ItemPointer tupleid;
ItemPointerData tuple_ctid;
bool matched = false;
Datum datum;
bool isNull = false;
HeapTupleHeader oldtuple = NULL;
Oid oldPartitionOid = InvalidOid;
AttrNumber partOidNum;
AttrNumber bucketIdNum;
int2 bucketid = InvalidBktId;
Assert(resultRelInfo->ri_RelationDesc->rd_rel->relkind == RELKIND_RELATION ||
resultRelInfo->ri_RelationDesc->rd_rel->relkind == PARTTYPE_PARTITIONED_RELATION ||
junkfilter != NULL);
* Reset per-tuple memory context to free any expression evaluation
* storage allocated in the previous cycle.
*/
ResetExprContext(econtext);
* We run a JOIN between the target relation and the source relation to
* find a set of candidate source rows that has matching row in the target
* table and a set of candidate source rows that does not have matching
* row in the target table. If the join returns us a tuple with target
* relation's tid set, that implies that the join found a matching row for
* the given source tuple. This case triggers the WHEN MATCHED clause of
* the MERGE. Whereas a NULL in the target relation's ctid column
* indicates a NOT MATCHED case.
*/
datum = ExecGetJunkAttribute(slot, junkfilter->jf_junkAttNo, &isNull);
if (!isNull) {
matched = true;
tupleid = (ItemPointer)DatumGetPointer(datum);
tuple_ctid = *tupleid;
tupleid = &tuple_ctid;
if (RELATION_IS_PARTITIONED(resultRelInfo->ri_RelationDesc) ||
RelationIsCUFormat(resultRelInfo->ri_RelationDesc)) {
Datum tableOiddatum;
bool tableOidisnull = false;
partOidNum = resultRelInfo->ri_partOidAttNum;
tableOiddatum = ExecGetJunkAttribute(slot, partOidNum, &tableOidisnull);
if (tableOidisnull) {
ereport(ERROR,
(errcode(ERRCODE_NULL_JUNK_ATTRIBUTE), errmsg("tableoid is null when merge partitioned table")));
}
oldPartitionOid = DatumGetObjectId(tableOiddatum);
}
if (RELATION_HAS_BUCKET(resultRelInfo->ri_RelationDesc)) {
Datum bucketIddatum;
bool bucketIdisnull = false;
bucketIdNum = resultRelInfo->ri_bucketIdAttNum;
bucketIddatum = ExecGetJunkAttribute(slot, bucketIdNum, &bucketIdisnull);
if (bucketIdisnull) {
ereport(ERROR, (errcode(ERRCODE_NULL_JUNK_ATTRIBUTE), errmsg("bucketid is null when merge table")));
}
bucketid = DatumGetObjectId(bucketIddatum);
}
} else {
matched = false;
tupleid = NULL;
}
* If we are dealing with a WHEN MATCHED case, we execute the first action
* for which the additional WHEN MATCHED AND quals pass. If an action
* without quals is found, that action is executed.
*
* Similarly, if we are dealing with WHEN NOT MATCHED case, we look at the
* given WHEN NOT MATCHED actions in sequence until one passes.
*
* Things get interesting in case of concurrent update/delete of the
* target tuple. Such concurrent update/delete is detected while we are
* executing a WHEN MATCHED action.
*
* A concurrent update can:
*
* 1. modify the target tuple so that it no longer satisfies the
* additional quals attached to the current WHEN MATCHED action OR
*
* In this case, we are still dealing with a WHEN MATCHED case, but
* we should recheck the list of WHEN MATCHED actions and choose the first
* one that satisfies the new target tuple.
*
* 2. modify the target tuple so that the join quals no longer pass and
* hence the source tuple no longer has a match.
*
* In the second case, the source tuple no longer matches the target tuple,
* so we now instead find a qualifying WHEN NOT MATCHED action to execute.
*
* A concurrent delete, changes a WHEN MATCHED case to WHEN NOT MATCHED.
*
* ExecMergeMatched takes care of following the update chain and
* re-finding the qualifying WHEN MATCHED action, as long as the updated
* target tuple still satisfies the join quals i.e. it still remains a
* WHEN MATCHED case. If the tuple gets deleted or the join quals fail, it
* returns and we try ExecMergeNotMatched. Given that ExecMergeMatched
* always make progress by following the update chain and we never switch
* from ExecMergeNotMatched to ExecMergeMatched, there is no risk of a
* livelock.
*/
if (matched)
matched = ExecMergeMatched(mtstate, estate, slot, junkfilter, tupleid, oldtuple, oldPartitionOid, bucketid, partExprKeyStr);
* Either we were dealing with a NOT MATCHED tuple or ExecMergeNotMatched()
* returned "false", indicating the previously MATCHED tuple is no longer a
* matching tuple.
*/
if (!matched)
ExecMergeNotMatched(mtstate, estate, slot, partExprKeyStr);
}
* Extract tuple for checking constraints from plan slot
*/
static TupleTableSlot* ExtractConstraintTuple(
ModifyTableState* mtstate, CmdType commandType, TupleTableSlot* slot, TupleDesc tupDesc)
{
ExprContext* econtext = mtstate->ps.ps_ExprContext;
AutoContextSwitch memContext(econtext->ecxt_per_tuple_memory);
HeapTuple tempTuple = NULL;
TupleTableSlot* constrSlot = NULL;
Datum* values = (Datum*)palloc0(sizeof(Datum) * tupDesc->natts);
bool* isnull = (bool*)palloc0(sizeof(bool) * tupDesc->natts);
TupleDesc originTupleDesc = slot->tts_tupleDescriptor;
int index = 0;
int i = 0;
switch (commandType) {
case CMD_UPDATE:
constrSlot = mtstate->mt_update_constr_slot;
for (i = 0; i < originTupleDesc->natts; i++) {
if (strstr(originTupleDesc->attrs[i].attname.data, "action UPDATE target")) {
values[index] = slot->tts_values[i];
isnull[index] = slot->tts_isnull[i];
index++;
}
}
break;
case CMD_INSERT:
constrSlot = mtstate->mt_insert_constr_slot;
for (i = 0; i < originTupleDesc->natts; i++) {
if (strstr(originTupleDesc->attrs[i].attname.data, "action INSERT target")) {
values[index] = slot->tts_values[i];
isnull[index] = slot->tts_isnull[i];
index++;
}
}
break;
default:
Assert(0);
}
Assert(constrSlot->tts_tupleDescriptor->td_tam_ops == originTupleDesc->td_tam_ops);
tempTuple = (HeapTuple)tableam_tops_form_tuple(tupDesc, values, isnull);
(void)ExecStoreTuple(tempTuple, constrSlot, InvalidBuffer, false);
return constrSlot;
}
* Extract scan tuple for target table from plan slot
*/
TupleTableSlot* ExtractScanTuple(ModifyTableState* mtstate, TupleTableSlot* slot, TupleDesc tupDesc)
{
List* sourceTargetList = ((ModifyTable*)mtstate->ps.plan)->mergeSourceTargetList;
ExprContext* econtext = mtstate->ps.ps_ExprContext;
AutoContextSwitch memContext(econtext->ecxt_per_tuple_memory);
HeapTuple tempTuple = NULL;
TupleTableSlot* scanSlot = mtstate->mt_scan_slot;
ListCell* lc = NULL;
Datum* values = (Datum*)palloc0(sizeof(Datum) * tupDesc->natts);
bool* isnull = (bool*)palloc0(sizeof(bool) * tupDesc->natts);
int startIdx = 0;
int index = 0;
* Find the right start index for target table. We should skip the sourceTargetList.
* First count the number of source targetlist. We add new columns to sourceTargetList
* but the resno is not continuous, so find the max continuous number to be the original
* length of sourceTargetList.
*/
foreach (lc, sourceTargetList) {
TargetEntry* tle = (TargetEntry*)lfirst(lc);
if (tle->resno != startIdx + 1)
break;
startIdx++;
}
for (index = 0; index < tupDesc->natts; index++) {
if (tupDesc->attrs[index].attisdropped == true) {
isnull[index] = true;
continue;
}
Assert(startIdx < slot->tts_tupleDescriptor->natts);
values[index] = tableam_tslot_getattr(slot, startIdx + 1, &isnull[index]);
startIdx++;
}
tempTuple = (HeapTuple)tableam_tops_form_tuple(tupDesc, values, isnull);
(void)ExecStoreTuple(tempTuple, scanSlot, InvalidBuffer, false);
return scanSlot;
}
* Description: projects and evaluates qual condition for update action.
* Parameters:
* @in mtstate: modifytable state.
* @in mergeMatchedActionStates: update action states.
* @in econtext: expression context.
* @in originSlot: slot to be projected.
* @in result_slot: slot to be returned.
* @in estate: working state for executor.
* Return: slot has been projected..
*/
TupleTableSlot* ExecMergeProjQual(ModifyTableState* mtstate, List* mergeMatchedActionStates, ExprContext* econtext,
TupleTableSlot* originSlot, TupleTableSlot* result_slot, EState* estate)
{
if (mergeMatchedActionStates != NIL) {
MergeActionState* action = (MergeActionState*)linitial(mergeMatchedActionStates);
ResultRelInfo* resultRelInfo = NULL;
Relation resultRelationDesc;
Assert(CMD_UPDATE == action->commandType);
* get information on the (current) result relation
*/
resultRelInfo = estate->es_result_relation_info;
resultRelationDesc = resultRelInfo->ri_RelationDesc;
* Make tuple and any needed join variables available to ExecQual and
* ExecProject. The target's existing tuple is installed in the scantuple.
* Again, this target relation's slot is required only in the case of a
* MATCHED tuple and UPDATE/DELETE actions.
*/
if (estate->es_result_update_remoterel == NULL) {
econtext->ecxt_scantuple = ExtractScanTuple(mtstate, originSlot, action->tupDesc);
econtext->ecxt_innertuple = originSlot;
econtext->ecxt_outertuple = NULL;
} else {
econtext->ecxt_scantuple = originSlot;
econtext->ecxt_innertuple = NULL;
econtext->ecxt_outertuple = NULL;
}
* Test condition, if any
*
* In the absence of a condition we perform the action unconditionally
* (no need to check separately since ExecQual() will return true if
* there are no conditions to evaluate).
*/
if (ExecQual((List*)action->whenqual, econtext)) {
if (estate->es_result_update_remoterel == NULL) {
* We set up the projection earlier, so all we do here is
* Project, no need for any other tasks prior to the
* ExecUpdate.
*/
result_slot = ExecProject(action->proj);
} else {
}
* We don't call ExecFilterJunk() because the projected tuple
* using the UPDATE action's targetlist doesn't have a junk
* attribute.
*/
if (estate->es_result_update_remoterel) {
estate->es_result_remoterel = estate->es_result_update_remoterel;
if (resultRelationDesc->rd_att->constr) {
mtstate->mt_update_constr_slot =
ExtractConstraintTuple(mtstate, CMD_UPDATE, result_slot, action->tupDesc);
}
}
return result_slot;
}
}
return NULL;
}
* Check and execute the first qualifying MATCHED action. The current target
* tuple is identified by tupleid.
*
* We start from the first WHEN MATCHED action and check if the WHEN AND quals
* pass, if any. If the WHEN AND quals for the first action do not pass, we
* check the second, then the third and so on. If we reach to the end, no
* action is taken and we return true, indicating that no further action is
* required for this tuple.
*
* If we do find a qualifying action, then we attempt to execute the action.
*
* If the tuple is concurrently updated, EvalPlanQual is run with the updated
* tuple to recheck the join quals. Note that the additional quals associated
* with individual actions are evaluated separately by the MERGE code, while
* EvalPlanQual checks for the join quals. If EvalPlanQual tells us that the
* updated tuple still passes the join quals, then we restart from the first
* action to look for a qualifying action. Otherwise, we return false meaning
* that a NOT MATCHED action must now be executed for the current source tuple.
*/
static bool ExecMergeMatched(ModifyTableState* mtstate, EState* estate, TupleTableSlot* slot, JunkFilter* junkfilter,
ItemPointer tupleid, HeapTupleHeader oldtuple, Oid oldPartitionOid, int2 bucketid, char* partExprKeyStr)
{
ExprContext* econtext = mtstate->ps.ps_ExprContext;
List* mergeMatchedActionStates = NIL;
EPQState* epqstate = &mtstate->mt_epqstate;
ResultRelInfo* saved_resultRelInfo = NULL;
ResultRelInfo* resultRelInfo = estate->es_result_relation_info;
TupleTableSlot* saved_slot = slot;
bool partKeyUpdated = ((ModifyTable*)mtstate->ps.plan)->partKeyUpdated;
* Save the current information and work with the correct result relation.
*/
saved_resultRelInfo = resultRelInfo;
estate->es_result_relation_info = resultRelInfo;
* And get the correct action lists.
*/
mergeMatchedActionStates = resultRelInfo->ri_mergeState->matchedActionStates;
if (mergeMatchedActionStates != NIL) {
MergeActionState* action = (MergeActionState*)linitial(mergeMatchedActionStates);
lmerge_matched:
slot = ExecMergeProjQual(mtstate, mergeMatchedActionStates, econtext, slot, slot, estate);
if (slot != NULL) {
TM_Result result = TM_Ok;
TM_FailureData tmfd;
(void)ExecUpdate(tupleid,
oldPartitionOid,
bucketid,
oldtuple,
slot,
saved_slot,
epqstate,
mtstate,
mtstate->canSetTag,
partKeyUpdated,
&result,
partExprKeyStr,
&tmfd);
* The matched tuple has been updated or deleted by trigger or
* other session, we have to check the updated version of the
* tuple to see if we want to process it under RC rules.
*/
switch (result) {
case TM_Ok:
break;
case TM_SelfUpdated:
case TM_SelfModified:
if (TransactionIdIsCurrentTransactionId(tmfd.xmax)) {
if (!MERGE_UPDATE_MULTI)
ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION),
errmsg("MERGE command cannot affect row a second time"),
errhint("Ensure that not more than one source row matches any one target row.")));
break;
}
elog(ERROR, "attempted to update or delete invisible tuple");
break;
case TM_Deleted:
if (IsolationUsesXactSnapshot())
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent delete")));
if (resultRelInfo->ri_RelationDesc->rd_rel->relrowmovement) {
Assert(RELATION_IS_PARTITIONED(resultRelInfo->ri_RelationDesc));
* the may be a row movement update action which delete tuple from original
* partition and insert tuple to new partition or we can add lock on the tuple to
* be delete or updated to avoid throw exception
*/
ereport(ERROR,
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
errmsg("partition table update conflict"),
errdetail("disable row movement of table can avoid this conflict")));
}
* If the tuple was already deleted, return to let caller
* handle it under NOT MATCHED clauses.
*/
return false;
case TM_Updated:
{
Relation resultRelationDesc;
Relation partRelationDesc;
TupleTableSlot *epqslot;
LockTupleMode lockmode;
bool isNull,
isPartition;
* The target tuple was concurrently updated by some other
* transaction. Run EvalPlanQual() with the new version of
* the tuple. If it does not return a tuple, then we
* switch to the NOT MATCHED list of actions. If it does
* return a tuple and the join qual is still satisfied,
* then we just need to recheck the MATCHED actions,
* starting from the top, and execute the first qualifying
* action.
*/
resultRelationDesc = resultRelInfo->ri_RelationDesc;
isPartition = RELATION_IS_PARTITIONED(resultRelationDesc);
if (isPartition) {
Partition partition = NULL;
searchFakeReationForPartitionOid(estate->esfRelations,
estate->es_query_cxt,
resultRelationDesc,
oldPartitionOid,
GetCurrentPartitionNo(oldPartitionOid),
partRelationDesc,
partition,
RowExclusiveLock);
}
lockmode = ExecUpdateLockMode(estate, resultRelInfo);
epqslot = EvalPlanQual(estate, epqstate,
isPartition ? partRelationDesc : resultRelationDesc,
resultRelInfo->ri_RangeTableIndex,
lockmode, &tmfd.ctid, tmfd.xmax,
resultRelationDesc->rd_rel->relrowmovement);
* If we got no tuple, or the tuple we get has a
* NULL ctid, go back to caller: this one is not a
* MATCHED tuple anymore, so they can retry with
* NOT MATCHED actions.
*/
if (TupIsNull(epqslot))
return false;
(void) ExecGetJunkAttribute(epqslot,
resultRelInfo->ri_junkFilter->jf_junkAttNo,
&isNull);
if (isNull)
return false;
* For partitioned table we have to check if the partition oid
* is NULL.
*/
if (isPartition) {
Datum partoid;
partoid = ExecGetJunkAttribute(epqslot,
resultRelInfo->ri_partOidAttNum,
&isNull);
if (isNull)
ereport(ERROR,
(errcode(ERRCODE_NULL_JUNK_ATTRIBUTE),
errmsg("tableoid is null when merge partitioned table")));
Assert(oldPartitionOid == DatumGetObjectId(partoid));
}
* A non-NULL ctid means that we are still dealing
* with MATCHED case. Restart the loop so that we
* apply all the MATCHED rules again, to ensure
* that the first qualifying WHEN MATCHED action
* is executed.
*
* Update tupleid to that of the new tuple, for
* the refetch we do at the top.
*/
saved_slot = slot = epqslot;
*tupleid = tmfd.ctid;
goto lmerge_matched;
}
default:
elog(ERROR, "unexpected tuple operation result: %d", result);
break;
}
}
if (action->commandType == CMD_UPDATE )
InstrCountFiltered2(&mtstate->ps, 1);
* We've activated one of the WHEN clauses, so we don't search
* further. This is required behaviour, not an optimization.
*/
estate->es_result_relation_info = saved_resultRelInfo;
}
* Successfully executed an action or no qualifying action was found.
*/
return true;
}
* Execute the first qualifying NOT MATCHED action.
*/
static void ExecMergeNotMatched(ModifyTableState* mtstate, EState* estate, TupleTableSlot* slot, char* partExprKeyStr)
{
ExprContext* econtext = mtstate->ps.ps_ExprContext;
List* mergeNotMatchedActionStates = NIL;
ResultRelInfo* resultRelInfo = NULL;
TupleTableSlot* myslot = NULL;
const int hi_options = 0;
* We are dealing with NOT MATCHED tuple. Since for MERGE, the partition
* tree is not expanded for the result relation, we continue to work with
* the currently active result relation, which corresponds to the root
* of the partition tree.
*/
resultRelInfo = mtstate->resultRelInfo;
* For INSERT actions, root relation's merge action is OK since the
* INSERT's targetlist and the WHEN conditions can only refer to the
* source relation and hence it does not matter which result relation we
* work with.
*/
mergeNotMatchedActionStates = resultRelInfo->ri_mergeState->notMatchedActionStates;
* Make source tuple available to ExecQual and ExecProject. We don't need
* the target tuple since the WHEN quals and the targetlist can't refer to
* the target columns.
*/
if (estate->es_result_insert_remoterel == NULL) {
econtext->ecxt_scantuple = slot;
econtext->ecxt_innertuple = slot;
econtext->ecxt_outertuple = NULL;
} else {
econtext->ecxt_scantuple = slot;
econtext->ecxt_innertuple = NULL;
econtext->ecxt_outertuple = NULL;
}
if (mergeNotMatchedActionStates != NIL) {
MergeActionState* action = (MergeActionState*)linitial(mergeNotMatchedActionStates);
ResultRelInfo* resultRelationInfo = NULL;
Relation resultRelationDesc;
Assert(CMD_INSERT == action->commandType);
* get information on the (current) result relation
*/
resultRelationInfo = estate->es_result_relation_info;
resultRelationDesc = resultRelationInfo->ri_RelationDesc;
* Test condition, if any
*
* In the absence of a condition we perform the action unconditionally
* (no need to check separately since ExecQual() will return true if
* there are no conditions to evaluate).
*/
if (ExecQual((List *)action->whenqual, econtext)) {
* We set up the projection earlier, so all we do here is
* Project, no need for any other tasks prior to the
* ExecInsert.
*/
if (estate->es_result_insert_remoterel == NULL) {
ExecProject(action->proj);
* ExecPrepareTupleRouting may modify the passed-in slot. Hence
* pass a local reference so that action->slot is not modified.
*/
myslot = mtstate->mt_mergeproj;
} else {
myslot = slot;
if (resultRelationDesc->rd_att->constr) {
mtstate->mt_insert_constr_slot = ExtractConstraintTuple(mtstate, CMD_INSERT, slot, action->tupDesc);
}
}
estate->es_result_remoterel = estate->es_result_insert_remoterel;
(void)ExecInsertT<false>(mtstate, myslot, slot, estate, mtstate->canSetTag, hi_options, NULL, partExprKeyStr);
InstrCountFiltered1(&mtstate->ps, 1);
}
}
}
* Creates the run-time state information for the Merge node
*/
void ExecInitMerge(ModifyTableState* mtstate, EState* estate, ResultRelInfo* resultRelInfo)
{
ListCell* l = NULL;
ExprContext* econtext = NULL;
List* mergeMatchedActionStates = NIL;
List* mergeNotMatchedActionStates = NIL;
TupleDesc relationDesc = resultRelInfo->ri_RelationDesc->rd_att;
ModifyTable* node = (ModifyTable*)mtstate->ps.plan;
if (node->mergeActionList == NIL)
return;
mtstate->mt_merge_subcommands = 0;
if (mtstate->ps.ps_ExprContext == NULL)
ExecAssignExprContext(estate, &mtstate->ps);
econtext = mtstate->ps.ps_ExprContext;
mtstate->mt_scan_slot = NULL;
mtstate->mt_update_constr_slot = NULL;
mtstate->mt_insert_constr_slot = NULL;
Assert(mtstate->mt_mergeproj == NULL);
mtstate->mt_mergeproj = ExecInitExtraTupleSlot(mtstate->ps.state);
ExecSetSlotDescriptor(mtstate->mt_mergeproj, relationDesc);
* Create a MergeActionState for each action on the mergeActionList
* and add it to either a list of matched actions or not-matched
* actions.
*/
foreach (l, node->mergeActionList) {
MergeAction* action = (MergeAction*)lfirst(l);
MergeActionState* action_state = makeNode(MergeActionState);
TupleDesc tupDesc;
action_state->matched = action->matched;
action_state->commandType = action->commandType;
if (estate->es_is_flt_frame) {
action_state->whenqual = ExecInitQualByFlatten((List*)action->qual, &mtstate->ps);
} else {
action_state->whenqual = ExecInitExprByRecursion((Expr*)action->qual, &mtstate->ps);
}
tupDesc = ExecTypeFromTL((List*)action->targetList, false, true, relationDesc->td_tam_ops);
action_state->tupDesc = tupDesc;
if (IS_PGXC_DATANODE && CMD_UPDATE == action->commandType) {
mtstate->mt_scan_slot = MakeSingleTupleTableSlot(tupDesc);
}
if (IS_PGXC_COORDINATOR && CMD_UPDATE == action->commandType && relationDesc->constr != NULL) {
mtstate->mt_update_constr_slot = MakeSingleTupleTableSlot(tupDesc);
}
if (IS_PGXC_COORDINATOR && CMD_INSERT == action->commandType && relationDesc->constr != NULL) {
mtstate->mt_insert_constr_slot = MakeSingleTupleTableSlot(tupDesc);
}
action_state->proj = ExecBuildProjectionInfo(action->targetList, econtext, mtstate->mt_mergeproj, &mtstate->ps, relationDesc);
* We create two lists - one for WHEN MATCHED actions and one
* for WHEN NOT MATCHED actions - and stick the
* MergeActionState into the appropriate list.
*/
if (action_state->matched)
mergeMatchedActionStates = lappend(mergeMatchedActionStates, action_state);
else
mergeNotMatchedActionStates = lappend(mergeNotMatchedActionStates, action_state);
switch (action->commandType) {
case CMD_INSERT:
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc, action->targetList);
mtstate->mt_merge_subcommands |= MERGE_INSERT;
break;
case CMD_UPDATE:
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc, action->targetList);
mtstate->mt_merge_subcommands |= MERGE_UPDATE;
break;
default:
Assert(0);
break;
}
resultRelInfo->ri_mergeState->matchedActionStates = mergeMatchedActionStates;
resultRelInfo->ri_mergeState->notMatchedActionStates = mergeNotMatchedActionStates;
}
}