/*
 * Copyright (c) 2020 Huawei Technologies Co.,Ltd.
 *
 * openGauss is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * execMerge.cpp
 *	  routines to handle Merge nodes relating to the MERGE command
 *
 * IDENTIFICATION
 *	  src/gausskernel/runtime/executor/execMerge.cpp
 *
 * -------------------------------------------------------------------------
 */
#include "postgres.h"
#include "knl/knl_variable.h"

#include "access/tableam.h"
#include "executor/executor.h"
#include "executor/node/nodeModifyTable.h"
#include "executor/exec/execMerge.h"
#include "miscadmin.h"
#include "nodes/nodeFuncs.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "access/heapam.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"

static void ExecMergeNotMatched(ModifyTableState* mtstate, EState* estate, TupleTableSlot* slot, char* partExprKeyStr = NULL);
static bool ExecMergeMatched(ModifyTableState* mtstate, EState* estate, TupleTableSlot* slot, JunkFilter* junkfilter,
    ItemPointer tupleid, HeapTupleHeader oldtuple, Oid oldPartitionOid, int2 bucketid, char* partExprKeyStr = NULL);
static LockTupleMode ExecUpdateLockMode(EState *estate, ResultRelInfo *relinfo);

#define GET_ALL_UPDATED_COLUMNS(relinfo, estate)                                     \
    (bms_union(exec_rt_fetch((relinfo)->ri_RangeTableIndex, estate)->updatedCols, \
        exec_rt_fetch((relinfo)->ri_RangeTableIndex, estate)->extraUpdatedCols))

/*
 * ExecUpdateLockMode -- find the appropriate UPDATE tuple lock mode for a
 * given ResultRelInfo
 */
static LockTupleMode
ExecUpdateLockMode(EState *estate, ResultRelInfo *relinfo)
{
    Bitmapset  *keyCols;
    Bitmapset  *updatedCols;

    /*
     * Compute lock mode to use.  If columns that are part of the key have not
     * been modified, then we can use a weaker lock, allowing for better
     * concurrency.
     */
    updatedCols = GET_ALL_UPDATED_COLUMNS(relinfo, estate);
    keyCols = RelationGetIndexAttrBitmap(relinfo->ri_RelationDesc,
                                         INDEX_ATTR_BITMAP_KEY);

    if (bms_overlap(keyCols, updatedCols))
        return LockTupleExclusive;

    return LockTupleNoKeyExclusive;
}

/*
 * Perform MERGE.
 */
void ExecMerge(ModifyTableState* mtstate, EState* estate, TupleTableSlot* slot, JunkFilter* junkfilter,
    ResultRelInfo* resultRelInfo, char* partExprKeyStr)
{
    ExprContext* econtext = mtstate->ps.ps_ExprContext;
    ItemPointer tupleid;
    ItemPointerData tuple_ctid;
    bool matched = false;
    Datum datum;
    bool isNull = false;
    HeapTupleHeader oldtuple = NULL;
    Oid oldPartitionOid = InvalidOid;
    AttrNumber partOidNum;
    AttrNumber bucketIdNum;
    int2 bucketid = InvalidBktId;

    Assert(resultRelInfo->ri_RelationDesc->rd_rel->relkind == RELKIND_RELATION ||
           resultRelInfo->ri_RelationDesc->rd_rel->relkind == PARTTYPE_PARTITIONED_RELATION ||
           junkfilter != NULL);

    /*
     * Reset per-tuple memory context to free any expression evaluation
     * storage allocated in the previous cycle.
     */
    ResetExprContext(econtext);

    /*
     * We run a JOIN between the target relation and the source relation to
     * find a set of candidate source rows that has matching row in the target
     * table and a set of candidate source rows that does not have matching
     * row in the target table. If the join returns us a tuple with target
     * relation's tid set, that implies that the join found a matching row for
     * the given source tuple. This case triggers the WHEN MATCHED clause of
     * the MERGE. Whereas a NULL in the target relation's ctid column
     * indicates a NOT MATCHED case.
     */
    datum = ExecGetJunkAttribute(slot, junkfilter->jf_junkAttNo, &isNull);

    if (!isNull) {
        matched = true;
        tupleid = (ItemPointer)DatumGetPointer(datum);
        tuple_ctid = *tupleid; /* be sure we don't free ctid!! */
        tupleid = &tuple_ctid;

        if (RELATION_IS_PARTITIONED(resultRelInfo->ri_RelationDesc) ||
            RelationIsCUFormat(resultRelInfo->ri_RelationDesc)) {
            Datum tableOiddatum;
            bool tableOidisnull = false;

            partOidNum = resultRelInfo->ri_partOidAttNum;
            tableOiddatum = ExecGetJunkAttribute(slot, partOidNum, &tableOidisnull);

            if (tableOidisnull) {
                ereport(ERROR,
                    (errcode(ERRCODE_NULL_JUNK_ATTRIBUTE), errmsg("tableoid is null when merge partitioned table")));
            }

            oldPartitionOid = DatumGetObjectId(tableOiddatum);
        }

        if (RELATION_HAS_BUCKET(resultRelInfo->ri_RelationDesc)) {
            Datum bucketIddatum;
            bool bucketIdisnull = false;

            bucketIdNum = resultRelInfo->ri_bucketIdAttNum;
            bucketIddatum = ExecGetJunkAttribute(slot, bucketIdNum, &bucketIdisnull);

            if (bucketIdisnull) {
                ereport(ERROR, (errcode(ERRCODE_NULL_JUNK_ATTRIBUTE), errmsg("bucketid is null when merge table")));
            }

            bucketid = DatumGetObjectId(bucketIddatum);
        }

    } else {
        matched = false;
        tupleid = NULL; /* we don't need it for INSERT actions */
    }

    /*
     * If we are dealing with a WHEN MATCHED case, we execute the first action
     * for which the additional WHEN MATCHED AND quals pass. If an action
     * without quals is found, that action is executed.
     *
     * Similarly, if we are dealing with WHEN NOT MATCHED case, we look at the
     * given WHEN NOT MATCHED actions in sequence until one passes.
     *
     * Things get interesting in case of concurrent update/delete of the
     * target tuple. Such concurrent update/delete is detected while we are
     * executing a WHEN MATCHED action.
     *
     * A concurrent update can:
     *
     * 1. modify the target tuple so that it no longer satisfies the
     * additional quals attached to the current WHEN MATCHED action OR
     *
     * In this case, we are still dealing with a WHEN MATCHED case, but
     * we should recheck the list of WHEN MATCHED actions and choose the first
     * one that satisfies the new target tuple.
     *
     * 2. modify the target tuple so that the join quals no longer pass and
     * hence the source tuple no longer has a match.
     *
     * In the second case, the source tuple no longer matches the target tuple,
     * so we now instead find a qualifying WHEN NOT MATCHED action to execute.
     *
     * A concurrent delete, changes a WHEN MATCHED case to WHEN NOT MATCHED.
     *
     * ExecMergeMatched takes care of following the update chain and
     * re-finding the qualifying WHEN MATCHED action, as long as the updated
     * target tuple still satisfies the join quals i.e. it still remains a
     * WHEN MATCHED case. If the tuple gets deleted or the join quals fail, it
     * returns and we try ExecMergeNotMatched. Given that ExecMergeMatched
     * always make progress by following the update chain and we never switch
     * from ExecMergeNotMatched to ExecMergeMatched, there is no risk of a
     * livelock.
     */
    if (matched)
        matched = ExecMergeMatched(mtstate, estate, slot, junkfilter, tupleid, oldtuple, oldPartitionOid, bucketid, partExprKeyStr);

    /*
     * Either we were dealing with a NOT MATCHED tuple or ExecMergeNotMatched()
     * returned "false", indicating the previously MATCHED tuple is no longer a
     * matching tuple.
     */
    if (!matched)
        ExecMergeNotMatched(mtstate, estate, slot, partExprKeyStr);
}

/*
 * Extract tuple for checking constraints from plan slot
 */
static TupleTableSlot* ExtractConstraintTuple(
    ModifyTableState* mtstate, CmdType commandType, TupleTableSlot* slot, TupleDesc tupDesc)
{
    ExprContext* econtext = mtstate->ps.ps_ExprContext;
    AutoContextSwitch memContext(econtext->ecxt_per_tuple_memory);
    HeapTuple tempTuple = NULL;
    TupleTableSlot* constrSlot = NULL;
    Datum* values = (Datum*)palloc0(sizeof(Datum) * tupDesc->natts);
    bool* isnull = (bool*)palloc0(sizeof(bool) * tupDesc->natts);
    TupleDesc originTupleDesc = slot->tts_tupleDescriptor;
    int index = 0;
    int i = 0;

    switch (commandType) {
        case CMD_UPDATE:
            constrSlot = mtstate->mt_update_constr_slot;
            for (i = 0; i < originTupleDesc->natts; i++) {
                if (strstr(originTupleDesc->attrs[i].attname.data, "action UPDATE target")) {
                    values[index] = slot->tts_values[i];
                    isnull[index] = slot->tts_isnull[i];
                    index++;
                }
            }
            break;
        case CMD_INSERT:
            constrSlot = mtstate->mt_insert_constr_slot;
            for (i = 0; i < originTupleDesc->natts; i++) {
                if (strstr(originTupleDesc->attrs[i].attname.data, "action INSERT target")) {
                    values[index] = slot->tts_values[i];
                    isnull[index] = slot->tts_isnull[i];
                    index++;
                }
            }
            break;
        default:
            Assert(0);
    }

    Assert(constrSlot->tts_tupleDescriptor->td_tam_ops == originTupleDesc->td_tam_ops);
    tempTuple = (HeapTuple)tableam_tops_form_tuple(tupDesc, values, isnull);
    (void)ExecStoreTuple(tempTuple, constrSlot, InvalidBuffer, false);

    return constrSlot;
}

/*
 * Extract scan tuple for target table from plan slot
 */
TupleTableSlot* ExtractScanTuple(ModifyTableState* mtstate, TupleTableSlot* slot, TupleDesc tupDesc)
{
    List* sourceTargetList = ((ModifyTable*)mtstate->ps.plan)->mergeSourceTargetList;
    ExprContext* econtext = mtstate->ps.ps_ExprContext;
    AutoContextSwitch memContext(econtext->ecxt_per_tuple_memory);
    HeapTuple tempTuple = NULL;
    TupleTableSlot* scanSlot = mtstate->mt_scan_slot;
    ListCell* lc = NULL;
    Datum* values = (Datum*)palloc0(sizeof(Datum) * tupDesc->natts);
    bool* isnull = (bool*)palloc0(sizeof(bool) * tupDesc->natts);
    int startIdx = 0;
    int index = 0;

    /*
     * Find the right start index for target table. We should skip the sourceTargetList.
     * First count the number of source targetlist. We add new columns to sourceTargetList
     * but the resno is not continuous, so find the max continuous number to be the original
     * length of sourceTargetList.
     */
    foreach (lc, sourceTargetList) {
        TargetEntry* tle = (TargetEntry*)lfirst(lc);
        if (tle->resno != startIdx + 1)
            break;
        startIdx++;
    }

    for (index = 0; index < tupDesc->natts; index++) {
        if (tupDesc->attrs[index].attisdropped == true) {
            isnull[index] = true;
            continue;
        }

        Assert(startIdx < slot->tts_tupleDescriptor->natts);
        values[index] = tableam_tslot_getattr(slot, startIdx + 1, &isnull[index]);
        startIdx++;
    }

    tempTuple = (HeapTuple)tableam_tops_form_tuple(tupDesc, values, isnull);
    (void)ExecStoreTuple(tempTuple, scanSlot, InvalidBuffer, false);

    return scanSlot;
}

/*
 * Description: projects and evaluates qual condition for update action.
 * Parameters:
 * @in mtstate: modifytable state.
 * @in mergeMatchedActionStates: update action states.
 * @in econtext: expression context.
 * @in originSlot: slot to be projected.
 * @in result_slot: slot to be returned.
 * @in estate: working state for executor.
 * Return: slot has been projected..
 */
TupleTableSlot* ExecMergeProjQual(ModifyTableState* mtstate, List* mergeMatchedActionStates, ExprContext* econtext,
    TupleTableSlot* originSlot, TupleTableSlot* result_slot, EState* estate)
{
    if (mergeMatchedActionStates != NIL) {
        MergeActionState* action = (MergeActionState*)linitial(mergeMatchedActionStates);
        ResultRelInfo* resultRelInfo = NULL;
        Relation resultRelationDesc;

        Assert(CMD_UPDATE == action->commandType);

        /*
         * get information on the (current) result relation
         */
        resultRelInfo = estate->es_result_relation_info;
        resultRelationDesc = resultRelInfo->ri_RelationDesc;

        /*
         * Make tuple and any needed join variables available to ExecQual and
         * ExecProject. The target's existing tuple is installed in the scantuple.
         * Again, this target relation's slot is required only in the case of a
         * MATCHED tuple and UPDATE/DELETE actions.
         */
        if (estate->es_result_update_remoterel == NULL) {
            econtext->ecxt_scantuple = ExtractScanTuple(mtstate, originSlot, action->tupDesc);
            econtext->ecxt_innertuple = originSlot;
            econtext->ecxt_outertuple = NULL;
        } else {
            econtext->ecxt_scantuple = originSlot;
            econtext->ecxt_innertuple = NULL;
            econtext->ecxt_outertuple = NULL;
        }

        /*
         * Test condition, if any
         *
         * In the absence of a condition we perform the action unconditionally
         * (no need to check separately since ExecQual() will return true if
         * there are no conditions to evaluate).
         */
        if (ExecQual((List*)action->whenqual, econtext)) {
            if (estate->es_result_update_remoterel == NULL) {
                /*
                 * We set up the projection earlier, so all we do here is
                 * Project, no need for any other tasks prior to the
                 * ExecUpdate.
                 */
                result_slot = ExecProject(action->proj);
            } else {
                /* we don't do projection in remote query */
            }

            /*
             * We don't call ExecFilterJunk() because the projected tuple
             * using the UPDATE action's targetlist doesn't have a junk
             * attribute.
             */
            if (estate->es_result_update_remoterel) {
                estate->es_result_remoterel = estate->es_result_update_remoterel;

                /* Check if has constraints */
                if (resultRelationDesc->rd_att->constr) {
                    mtstate->mt_update_constr_slot =
                        ExtractConstraintTuple(mtstate, CMD_UPDATE, result_slot, action->tupDesc);
                }
            }

            return result_slot;
        }
    }
    return NULL;
}

/*
 * Check and execute the first qualifying MATCHED action. The current target
 * tuple is identified by tupleid.
 *
 * We start from the first WHEN MATCHED action and check if the WHEN AND quals
 * pass, if any. If the WHEN AND quals for the first action do not pass, we
 * check the second, then the third and so on. If we reach to the end, no
 * action is taken and we return true, indicating that no further action is
 * required for this tuple.
 *
 * If we do find a qualifying action, then we attempt to execute the action.
 *
 * If the tuple is concurrently updated, EvalPlanQual is run with the updated
 * tuple to recheck the join quals. Note that the additional quals associated
 * with individual actions are evaluated separately by the MERGE code, while
 * EvalPlanQual checks for the join quals. If EvalPlanQual tells us that the
 * updated tuple still passes the join quals, then we restart from the first
 * action to look for a qualifying action. Otherwise, we return false meaning
 * that a NOT MATCHED action must now be executed for the current source tuple.
 */
static bool ExecMergeMatched(ModifyTableState* mtstate, EState* estate, TupleTableSlot* slot, JunkFilter* junkfilter,
    ItemPointer tupleid, HeapTupleHeader oldtuple, Oid oldPartitionOid, int2 bucketid, char* partExprKeyStr)
{
    ExprContext* econtext = mtstate->ps.ps_ExprContext;
    List* mergeMatchedActionStates = NIL;
    EPQState* epqstate = &mtstate->mt_epqstate;
    ResultRelInfo* saved_resultRelInfo = NULL;
    ResultRelInfo* resultRelInfo = estate->es_result_relation_info;
    TupleTableSlot* saved_slot = slot;
    bool partKeyUpdated = ((ModifyTable*)mtstate->ps.plan)->partKeyUpdated;

    /*
     * Save the current information and work with the correct result relation.
     */
    saved_resultRelInfo = resultRelInfo;
    estate->es_result_relation_info = resultRelInfo;

    /*
     * And get the correct action lists.
     */
    mergeMatchedActionStates = resultRelInfo->ri_mergeState->matchedActionStates;

    if (mergeMatchedActionStates != NIL) {
        MergeActionState* action = (MergeActionState*)linitial(mergeMatchedActionStates);

lmerge_matched:
        slot = ExecMergeProjQual(mtstate, mergeMatchedActionStates, econtext, slot, slot, estate);

        if (slot != NULL) {
            TM_Result result = TM_Ok;
            TM_FailureData tmfd;
            (void)ExecUpdate(tupleid,
                             oldPartitionOid,
                             bucketid,
                             oldtuple,
                             slot,
                             saved_slot,
                             epqstate,
                             mtstate,
                             mtstate->canSetTag,
                             partKeyUpdated,
                             &result,
                             partExprKeyStr,
                             &tmfd);

            /*
             * The matched tuple has been updated or deleted by trigger or
             * other session, we have to check the updated version of the
             * tuple to see if we want to process it under RC rules.
             */
            switch (result) {
                case TM_Ok:
                    /* all good; perform final actions */
                    break;
                case TM_SelfUpdated:
                case TM_SelfModified:
                    /* The SQL standard disallows this for MERGE, but... */
                    if (TransactionIdIsCurrentTransactionId(tmfd.xmax)) {
                        if (!MERGE_UPDATE_MULTI)
                            ereport(ERROR,
                                    (errcode(ERRCODE_CARDINALITY_VIOLATION),
                                    errmsg("MERGE command cannot affect row a second time"),
                                    errhint("Ensure that not more than one source row matches any one target row.")));
                        /* already updated or deleted. */
                        break;
                    }
                    /* This shouldn't happen */
                    elog(ERROR, "attempted to update or delete invisible tuple");
                    break;
                case TM_Deleted:
                    if (IsolationUsesXactSnapshot())
                        ereport(ERROR,
                                (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                errmsg("could not serialize access due to concurrent delete")));

                    if (resultRelInfo->ri_RelationDesc->rd_rel->relrowmovement) {
                        Assert(RELATION_IS_PARTITIONED(resultRelInfo->ri_RelationDesc));
                        /*
                         * the may be a row movement update action which delete tuple from original
                         * partition and insert tuple to new partition or we can add lock on the tuple to
                         * be delete or updated to avoid throw exception
                         */
                        ereport(ERROR,
                                (errcode(ERRCODE_TRANSACTION_ROLLBACK),
                                errmsg("partition table update conflict"),
                                errdetail("disable row movement of table can avoid this conflict")));
                    }
                    /*
                    * If the tuple was already deleted, return to let caller
                    * handle it under NOT MATCHED clauses.
                    */
                    return false;
                case TM_Updated:
                    {
                        Relation   resultRelationDesc;
                        Relation   partRelationDesc;
                        TupleTableSlot *epqslot;
                        LockTupleMode lockmode;
                        bool    isNull,
                                isPartition;

                        /*
                         * The target tuple was concurrently updated by some other
                         * transaction. Run EvalPlanQual() with the new version of
                         * the tuple. If it does not return a tuple, then we
                         * switch to the NOT MATCHED list of actions. If it does
                         * return a tuple and the join qual is still satisfied,
                         * then we just need to recheck the MATCHED actions,
                         * starting from the top, and execute the first qualifying
                         * action.
                         */
                        resultRelationDesc = resultRelInfo->ri_RelationDesc;
                        isPartition = RELATION_IS_PARTITIONED(resultRelationDesc);
                        if (isPartition) {
                            Partition partition = NULL;

                            searchFakeReationForPartitionOid(estate->esfRelations,
                                estate->es_query_cxt,
                                resultRelationDesc,
                                oldPartitionOid,
                                GetCurrentPartitionNo(oldPartitionOid),
                                partRelationDesc,
                                partition,
                                RowExclusiveLock);
                        }
                        lockmode = ExecUpdateLockMode(estate, resultRelInfo);
                        epqslot = EvalPlanQual(estate, epqstate,
                                               isPartition ? partRelationDesc : resultRelationDesc,
                                               resultRelInfo->ri_RangeTableIndex,
                                               lockmode, &tmfd.ctid, tmfd.xmax,
                                               resultRelationDesc->rd_rel->relrowmovement);

                        /*
                         * If we got no tuple, or the tuple we get has a
                         * NULL ctid, go back to caller: this one is not a
                         * MATCHED tuple anymore, so they can retry with
                         * NOT MATCHED actions.
                         */
                        if (TupIsNull(epqslot))
                            return false;

                        (void) ExecGetJunkAttribute(epqslot,
                                                    resultRelInfo->ri_junkFilter->jf_junkAttNo,
                                                    &isNull);
                        if (isNull)
                            return false;

                        /*
                         * For partitioned table we have to check if the partition oid
                         * is NULL.
                         */
                        if (isPartition) {
                            Datum partoid;
                            partoid = ExecGetJunkAttribute(epqslot,
                                                           resultRelInfo->ri_partOidAttNum,
                                                           &isNull);
                            if (isNull)
                                ereport(ERROR,
                                        (errcode(ERRCODE_NULL_JUNK_ATTRIBUTE),
                                         errmsg("tableoid is null when merge partitioned table")));
                            Assert(oldPartitionOid == DatumGetObjectId(partoid));
                        }

                        /*
                         * A non-NULL ctid means that we are still dealing
                         * with MATCHED case. Restart the loop so that we
                         * apply all the MATCHED rules again, to ensure
                         * that the first qualifying WHEN MATCHED action
                         * is executed.
                         *
                         * Update tupleid to that of the new tuple, for
                         * the refetch we do at the top.
                         */
                        saved_slot = slot = epqslot;
                        *tupleid = tmfd.ctid;
                        goto lmerge_matched;
                    }
                default:
                    elog(ERROR, "unexpected tuple operation result: %d", result);
                    break;
            }
        }
        if (action->commandType == CMD_UPDATE /* && tuple_updated*/)
            InstrCountFiltered2(&mtstate->ps, 1);

        /*
         * We've activated one of the WHEN clauses, so we don't search
         * further. This is required behaviour, not an optimization.
         */
        estate->es_result_relation_info = saved_resultRelInfo;
    }

    /*
     * Successfully executed an action or no qualifying action was found.
     */
    return true;
}

/*
 * Execute the first qualifying NOT MATCHED action.
 */
static void ExecMergeNotMatched(ModifyTableState* mtstate, EState* estate, TupleTableSlot* slot, char* partExprKeyStr)
{
    ExprContext* econtext = mtstate->ps.ps_ExprContext;
    List* mergeNotMatchedActionStates = NIL;
    ResultRelInfo* resultRelInfo = NULL;
    TupleTableSlot* myslot = NULL;
    const int hi_options = 0;

    /*
     * We are dealing with NOT MATCHED tuple. Since for MERGE, the partition
     * tree is not expanded for the result relation, we continue to work with
     * the currently active result relation, which corresponds to the root
     * of the partition tree.
     */
    resultRelInfo = mtstate->resultRelInfo;

    /*
     * For INSERT actions, root relation's merge action is OK since the
     * INSERT's targetlist and the WHEN conditions can only refer to the
     * source relation and hence it does not matter which result relation we
     * work with.
     */
    mergeNotMatchedActionStates = resultRelInfo->ri_mergeState->notMatchedActionStates;

    /*
     * Make source tuple available to ExecQual and ExecProject. We don't need
     * the target tuple since the WHEN quals and the targetlist can't refer to
     * the target columns.
     */
    if (estate->es_result_insert_remoterel == NULL) {
        econtext->ecxt_scantuple = slot;
        econtext->ecxt_innertuple = slot;
        econtext->ecxt_outertuple = NULL;
    } else {
        econtext->ecxt_scantuple = slot;
        econtext->ecxt_innertuple = NULL;
        econtext->ecxt_outertuple = NULL;
    }

    if (mergeNotMatchedActionStates != NIL) {
        MergeActionState* action = (MergeActionState*)linitial(mergeNotMatchedActionStates);
        ResultRelInfo* resultRelationInfo = NULL;
        Relation resultRelationDesc;

        Assert(CMD_INSERT == action->commandType);

        /*
         * get information on the (current) result relation
         */
        resultRelationInfo = estate->es_result_relation_info;
        resultRelationDesc = resultRelationInfo->ri_RelationDesc;

        /*
         * Test condition, if any
         *
         * In the absence of a condition we perform the action unconditionally
         * (no need to check separately since ExecQual() will return true if
         * there are no conditions to evaluate).
         */
        if (ExecQual((List *)action->whenqual, econtext)) {
            /*
             * We set up the projection earlier, so all we do here is
             * Project, no need for any other tasks prior to the
             * ExecInsert.
             */
            if (estate->es_result_insert_remoterel == NULL) {
                ExecProject(action->proj);
                /*
                 * ExecPrepareTupleRouting may modify the passed-in slot. Hence
                 * pass a local reference so that action->slot is not modified.
                 */
                myslot = mtstate->mt_mergeproj;
            } else {
                /* in pgxc we do projection in the remote query*/
                myslot = slot;

                /* Check if has constraints */
                if (resultRelationDesc->rd_att->constr) {
                    mtstate->mt_insert_constr_slot = ExtractConstraintTuple(mtstate, CMD_INSERT, slot, action->tupDesc);
                }
            }

            estate->es_result_remoterel = estate->es_result_insert_remoterel;

            (void)ExecInsertT<false>(mtstate, myslot, slot, estate, mtstate->canSetTag, hi_options, NULL, partExprKeyStr);

            InstrCountFiltered1(&mtstate->ps, 1);
        }
    }
}

/*
 * Creates the run-time state information for the Merge node
 */
void ExecInitMerge(ModifyTableState* mtstate, EState* estate, ResultRelInfo* resultRelInfo)
{
    ListCell* l = NULL;
    ExprContext* econtext = NULL;
    List* mergeMatchedActionStates = NIL;
    List* mergeNotMatchedActionStates = NIL;
    TupleDesc relationDesc = resultRelInfo->ri_RelationDesc->rd_att;
    ModifyTable* node = (ModifyTable*)mtstate->ps.plan;

    if (node->mergeActionList == NIL)
        return;

    mtstate->mt_merge_subcommands = 0;

    if (mtstate->ps.ps_ExprContext == NULL)
        ExecAssignExprContext(estate, &mtstate->ps);

    econtext = mtstate->ps.ps_ExprContext;

    /* initialize scan slot and constraint slot */
    mtstate->mt_scan_slot = NULL;
    mtstate->mt_update_constr_slot = NULL;
    mtstate->mt_insert_constr_slot = NULL;

    /* initialize slot for merge actions */
    Assert(mtstate->mt_mergeproj == NULL);
    mtstate->mt_mergeproj = ExecInitExtraTupleSlot(mtstate->ps.state);
    ExecSetSlotDescriptor(mtstate->mt_mergeproj, relationDesc);

    /*
     * Create a MergeActionState for each action on the mergeActionList
     * and add it to either a list of matched actions or not-matched
     * actions.
     */
    foreach (l, node->mergeActionList) {
        MergeAction* action = (MergeAction*)lfirst(l);
        MergeActionState* action_state = makeNode(MergeActionState);
        TupleDesc tupDesc;

        action_state->matched = action->matched;
        action_state->commandType = action->commandType;
        if (estate->es_is_flt_frame) {
            action_state->whenqual = ExecInitQualByFlatten((List*)action->qual, &mtstate->ps);
        } else {
            action_state->whenqual = ExecInitExprByRecursion((Expr*)action->qual, &mtstate->ps);
        }

        /* create target slot for this action's projection */
        tupDesc = ExecTypeFromTL((List*)action->targetList, false, true, relationDesc->td_tam_ops);
        action_state->tupDesc = tupDesc;

        if (IS_PGXC_DATANODE && CMD_UPDATE == action->commandType) {
            mtstate->mt_scan_slot = MakeSingleTupleTableSlot(tupDesc);
        }

        if (IS_PGXC_COORDINATOR && CMD_UPDATE == action->commandType && relationDesc->constr != NULL) {
            mtstate->mt_update_constr_slot = MakeSingleTupleTableSlot(tupDesc);
        }

        if (IS_PGXC_COORDINATOR && CMD_INSERT == action->commandType && relationDesc->constr != NULL) {
            mtstate->mt_insert_constr_slot = MakeSingleTupleTableSlot(tupDesc);
        }

        /* build action projection state */
        action_state->proj = ExecBuildProjectionInfo(action->targetList, econtext, mtstate->mt_mergeproj, &mtstate->ps, relationDesc);

        /*
         * We create two lists - one for WHEN MATCHED actions and one
         * for WHEN NOT MATCHED actions - and stick the
         * MergeActionState into the appropriate list.
         */
        if (action_state->matched)
            mergeMatchedActionStates = lappend(mergeMatchedActionStates, action_state);
        else
            mergeNotMatchedActionStates = lappend(mergeNotMatchedActionStates, action_state);

        switch (action->commandType) {
            case CMD_INSERT:
                ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc, action->targetList);
                mtstate->mt_merge_subcommands |= MERGE_INSERT;
                break;
            case CMD_UPDATE:
                ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc, action->targetList);
                mtstate->mt_merge_subcommands |= MERGE_UPDATE;
                break;
            default:
                Assert(0);
                break;
        }

        resultRelInfo->ri_mergeState->matchedActionStates = mergeMatchedActionStates;
        resultRelInfo->ri_mergeState->notMatchedActionStates = mergeNotMatchedActionStates;
    }
}