/* -------------------------------------------------------------------------
 *
 * ubtxlog.cpp
 *	  WAL replay logic for btrees.
 *
 *
 * Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
 * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *	  src/gausskernel/storage/access/ubtree/ubtxlog.cpp
 *
 * -------------------------------------------------------------------------
 */
#include "postgres.h"
#include "knl/knl_variable.h"

#include "access/nbtree.h"
#include "access/ubtree.h"
#include "access/transam.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "access/xlogproc.h"
#include "access/ubtreepcr.h"

#include "storage/procarray.h"
#include "miscadmin.h"
#include "pgxc/pgxc.h"
#include "access/multi_redo_api.h"
#include "access/parallel_recovery/dispatcher.h"

#ifdef ENABLE_UT
#define static
#endif

/*
 * We must keep track of expected insertions due to page splits, and apply
 * them manually if they are not seen in the WAL log during replay.  This
 * makes it safe for page insertion to be a multiple-WAL-action process.
 *
 * Similarly, deletion of an only child page and deletion of its parent page
 * form multiple WAL log entries, and we have to be prepared to follow through
 * with the deletion if the log ends between.
 *
 * The data structure is a simple linked list --- this should be good enough,
 * since we don't expect a page split or multi deletion to remain incomplete
 * for long.  In any case we need to respect the order of operations.
 */
typedef struct UBTreeIncompleteAction {
    RelFileNode node; /* the index */
    bool is_split;    /* T = pending split, F = pending delete */
    /* these fields are for a split: */
    bool is_root;         /* we split the root */
    BlockNumber leftblk;  /* left half of split */
    BlockNumber rightblk; /* right half of split */
    uint32 level;
    /* these fields are for a delete: */
    BlockNumber delblk; /* parent block to be deleted */
} UBTreeIncompleteAction;

static void LogIncompleteSplit(const RelFileNode *node, BlockNumber leftblk, BlockNumber rightblk, bool is_root)
{
    MemoryContext oldCtx = NULL;
    if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
        oldCtx = MemoryContextSwitchTo(g_instance.comm_cxt.predo_cxt.parallelRedoCtx);
    }

    UBTreeIncompleteAction *action = (UBTreeIncompleteAction *)palloc(sizeof(UBTreeIncompleteAction));
    if (log_min_messages <= DEBUG4) {
        ereport(LOG, (errmsg("[BTREE_ACTION_TRACE]LogIncompleteSplit: spc:%u,db:%u,rel:%u,"
                             "leftblk:%u,rightblk:%u,is_root:%d",
                             node->spcNode, node->dbNode, node->relNode, leftblk, rightblk, is_root)));
    }
    action->node = *node;
    action->is_split = true;
    action->is_root = is_root;
    action->leftblk = leftblk;
    action->rightblk = rightblk;
    t_thrd.xlog_cxt.incomplete_actions = lappend(t_thrd.xlog_cxt.incomplete_actions, action);

    if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
        (void)MemoryContextSwitchTo(oldCtx);
    }
}

static void ForgetMatchingSplit(const RelFileNode *node, BlockNumber downlink, bool is_root)
{
    ListCell *l = NULL;
    if (log_min_messages <= DEBUG4) {
        ereport(LOG, (errmsg("[BTREE_ACTION_TRACE]ForgetMatchingSplit begin: spc:%u,db:%u,rel:%u,"
                             "downlink:%u, is_root:%d",
                             node->spcNode, node->dbNode, node->relNode, downlink, is_root)));
    }
    MemoryContext oldCtx = NULL;
    if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
        oldCtx = MemoryContextSwitchTo(g_instance.comm_cxt.predo_cxt.parallelRedoCtx);
    }
    foreach (l, t_thrd.xlog_cxt.incomplete_actions) {
        UBTreeIncompleteAction *action = (UBTreeIncompleteAction *)lfirst(l);

        if (RelFileNodeEquals(*node, action->node) && action->is_split && downlink == action->rightblk) {
            if (log_min_messages <= DEBUG4) {
                ereport(LOG,
                        (errmsg("[BTREE_ACTION_TRACE]ForgetMatchingSplit successfully: input spc:%u,db:%u,rel:%u,"
                                "downlink:%u, is_root:%d, action spc:%u,db:%u,rel:%u,"
                                "is_split:%d,is_root:%d,leftblk:%u,rightblk:%u,level:%u,delblk:%u",
                                node->spcNode, node->dbNode, node->relNode, downlink, is_root, action->node.spcNode,
                                action->node.dbNode, action->node.relNode, action->is_split, action->is_root,
                                action->leftblk, action->rightblk, action->level, action->delblk)));
            }
            if (is_root != action->is_root)
                ereport(LOG, (errmsg("ForgetMatchingSplit: fishy is_root data (expected %d, got %d)", action->is_root,
                                     is_root)));
            t_thrd.xlog_cxt.incomplete_actions = list_delete_ptr(t_thrd.xlog_cxt.incomplete_actions, action);
            pfree(action);
            break; /* need not look further */
        }
    }
    if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
        (void)MemoryContextSwitchTo(oldCtx);
    }
}

static void LogIncompleteDeletion(const RelFileNode *node, BlockNumber delblk)
{
    MemoryContext oldCtx = NULL;
    if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
        oldCtx = MemoryContextSwitchTo(g_instance.comm_cxt.predo_cxt.parallelRedoCtx);
    }
    UBTreeIncompleteAction *action = (UBTreeIncompleteAction *)palloc(sizeof(UBTreeIncompleteAction));
    if (log_min_messages <= DEBUG4) {
        ereport(LOG, (errmsg("[BTREE_ACTION_TRACE]LogIncompleteDeletion: spc:%u,db:%u,rel:%u,"
                             "delblk:%u",
                             node->spcNode, node->dbNode, node->relNode, delblk)));
    }
    action->node = *node;
    action->is_split = false;
    action->delblk = delblk;
    t_thrd.xlog_cxt.incomplete_actions = lappend(t_thrd.xlog_cxt.incomplete_actions, action);
    if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
        (void)MemoryContextSwitchTo(oldCtx);
    }
}

static void ForgetMatchingDeletion(const RelFileNode *node, BlockNumber delblk)
{
    ListCell *l = NULL;

    MemoryContext oldCtx = NULL;
    if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
        oldCtx = MemoryContextSwitchTo(g_instance.comm_cxt.predo_cxt.parallelRedoCtx);
    }

    if (SHOW_DEBUG_MESSAGE()) {
        ereport(LOG,
                (errmsg("[BTREE_ACTION_TRACE]ForgetMatchingDeletion begin: spc:%u,db:%u,rel:%u,"
                        "delblk:%u",
                        node->spcNode,
                        node->dbNode,
                        node->relNode,
                        delblk)));
    }

    foreach (l, t_thrd.xlog_cxt.incomplete_actions) {
        UBTreeIncompleteAction *action = (UBTreeIncompleteAction *)lfirst(l);

        if (RelFileNodeEquals(*node, action->node) && !action->is_split && delblk == action->delblk) {
            if (SHOW_DEBUG_MESSAGE()) {
                ereport(LOG,
                        (errmsg("[BTREE_ACTION_TRACE]ForgetMatchingDeletion successfully: input spc:%u,db:%u,rel:%u,"
                                "delblk:%u, action spc:%u,db:%u,rel:%u,"
                                "is_split:%d,is_root:%d,leftblk:%u,rightblk:%u,level:%u,delblk:%u",
                                node->spcNode, node->dbNode, node->relNode, delblk, action->node.spcNode,
                                action->node.dbNode, action->node.relNode, action->is_split, action->is_root,
                                action->leftblk, action->rightblk, action->level, action->delblk)));
            }

            t_thrd.xlog_cxt.incomplete_actions = list_delete_ptr(t_thrd.xlog_cxt.incomplete_actions, action);
            pfree(action);
            break; /* need not look further */
        }
    }
    if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
        (void)MemoryContextSwitchTo(oldCtx);
    }
}

static void UBTreeRestoreMeta(XLogReaderState *record, uint8 block_id)
{
    RedoBufferInfo metabuf;
    char *ptr = NULL;
    Size len;

    XLogInitBufferForRedo(record, block_id, &metabuf);
    ptr = XLogRecGetBlockData(record, block_id, &len);

    UBTreeRestoreMetaOperatorPage(&metabuf, (void *)ptr, len);
    MarkBufferDirty(metabuf.buf);
    UnlockReleaseBuffer(metabuf.buf);
}

/*
 * UBTreeClearIncompleteSplit -- clear INCOMPLETE_SPLIT flag on a page
 *
 * This is a common subroutine of the redo functions of all the WAL record
 * types that can insert a downlink: insert, split, and newroot.
 */
static void UBTreeClearIncompleteSplit(XLogReaderState *record, uint8 block_id)
{
    RedoBufferInfo buffer;

    if (XLogReadBufferForRedo(record, block_id, &buffer) == BLK_NEEDS_REDO) {
        UBTreeXlogClearIncompleteSplit<UBTPageOpaqueInternal>(&buffer);
        MarkBufferDirty(buffer.buf);
    }
    if (BufferIsValid(buffer.buf)) {
        UnlockReleaseBuffer(buffer.buf);
    }
}

static void UBTreeXlogInsert(bool isleaf, bool ismeta, XLogReaderState *record, bool issplitupgrade)
{
    xl_btree_insert *xlrec = (xl_btree_insert *)XLogRecGetData(record);
    RelFileNode rnode;
    RedoBufferInfo buffer;
    char *datapos = NULL;
    BlockNumber downlink = 0;

    /*
     * Insertion to an internal page finishes an incomplete split at the child
     * level.  Clear the incomplete-split flag in the child.  Note: during
     * normal operation, the child and parent pages are locked at the same
     * time, so that clearing the flag and inserting the downlink appear
     * atomic to other backends.  We don't bother with that during replay,
     * because readers don't care about the incomplete-split flag and there
     * cannot be updates happening.
     */
    if (!issplitupgrade) {
        XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
        if (!isleaf) {
            datapos = (char *)xlrec + SizeOfBtreeInsert;
            errno_t rc = memcpy_s(&downlink, sizeof(BlockNumber), datapos, sizeof(BlockNumber));
            securec_check(rc, "\0", "\0");
        }
    } else {
        if (!isleaf) {
            UBTreeClearIncompleteSplit(record, BTREE_INSERT_CHILD_BLOCK_NUM);
        }
    }

    if (XLogReadBufferForRedo(record, BTREE_INSERT_ORIG_BLOCK_NUM, &buffer) == BLK_NEEDS_REDO) {
        Size datalen;

        datapos = XLogRecGetBlockData(record, BTREE_INSERT_ORIG_BLOCK_NUM, &datalen);
        UBTreeXlogInsertOperatorPage(&buffer, (void *)xlrec, (void *)datapos, datalen);
        MarkBufferDirty(buffer.buf);
    }

    if (BufferIsValid(buffer.buf)) {
        UnlockReleaseBuffer(buffer.buf);
    }

    /*
     * Note: in normal operation, we'd update the metapage while still holding
     * lock on the page we inserted into.  But during replay it's not
     * necessary to hold that lock, since no other index updates can be
     * happening concurrently, and readers will cope fine with following an
     * obsolete link from the metapage.
     */
    if (!issplitupgrade) {
        if (ismeta) {
            UBTreeRestoreMeta(record, 1);
        }
        /* Forget any split this insertion completes */
        if (!isleaf) {
            ForgetMatchingSplit(&rnode, downlink, false);
        }
    } else {
        if (ismeta) {
            UBTreeRestoreMeta(record, BTREE_INSERT_META_BLOCK_NUM);
        }
    }
}

static void UBTreeXlogSplitUpdate(bool onleft, bool isroot, XLogReaderState *record, bool hasOpaque)
{
    Size datalen;
    char *datapos = NULL;
    RelFileNode rnode;
    BlockNumber leftsib;
    BlockNumber rightsib;
    BlockNumber rnext;

    XLogRecGetBlockTag(record, BTREE_SPLIT_LEFT_BLOCK_NUM, &rnode, NULL, &leftsib);
    XLogRecGetBlockTag(record, BTREE_SPLIT_RIGHT_BLOCK_NUM, NULL, NULL, &rightsib);
    if (!XLogRecGetBlockTag(record, BTREE_SPLIT_RIGHTNEXT_BLOCK_NUM, NULL, NULL, &rnext)) {
        rnext = P_NONE;
    }

    xl_ubtree_split *xlrec = (xl_ubtree_split *)XLogRecGetData(record);
    bool isleaf = (xlrec->level == 0);

    if (!isleaf) {
        UBTreeClearIncompleteSplit(record, BTREE_SPLIT_CHILD_BLOCK_NUM);
    }

    /* Reconstruct right (new) sibling page from scratch */
    RedoBufferInfo rbuf;
    XLogInitBufferForRedo(record, BTREE_SPLIT_RIGHT_BLOCK_NUM, &rbuf);

    datapos = XLogRecGetBlockData(record, BTREE_SPLIT_RIGHT_BLOCK_NUM, &datalen);
    UBTreeXlogSplitOperatorRightPage(&rbuf, (void *)xlrec, leftsib, rnext, (void *)datapos, datalen, hasOpaque);
    MarkBufferDirty(rbuf.buf);

    RedoBufferInfo lbuf;
    if (XLogReadBufferForRedo(record, BTREE_SPLIT_LEFT_BLOCK_NUM, &lbuf) == BLK_NEEDS_REDO) {
        datapos = XLogRecGetBlockData(record, BTREE_SPLIT_LEFT_BLOCK_NUM, &datalen);
        UBTreeXlogSplitOperatorLeftpage(&lbuf, (void *)xlrec, rightsib, onleft, (void *)datapos, datalen, hasOpaque);
        MarkBufferDirty(lbuf.buf);
    }

    if (BufferIsValid(lbuf.buf)) {
        UnlockReleaseBuffer(lbuf.buf);
    }
    UnlockReleaseBuffer(rbuf.buf);

    if (rnext != P_NONE) {
        RedoBufferInfo buffer;

        if (XLogReadBufferForRedo(record, BTREE_SPLIT_RIGHTNEXT_BLOCK_NUM, &buffer) == BLK_NEEDS_REDO) {
            UBTreeXlogSplitOperatorNextpage(&buffer, rightsib);
            MarkBufferDirty(buffer.buf);
        }
        if (BufferIsValid(buffer.buf)) {
            UnlockReleaseBuffer(buffer.buf);
        }
    }
}

static void UBTreeXlogSplit(bool onleft, bool isroot, XLogReaderState *record, bool issplitupgrade, bool hasOpaque)
{
    if (issplitupgrade) {
        UBTreeXlogSplitUpdate(onleft, isroot, record, hasOpaque);
        return;
    }

    XLogRecPtr lsn = record->EndRecPtr;
    xl_ubtree_split *xlrec = (xl_ubtree_split *)XLogRecGetData(record);
    bool isleaf = (xlrec->level == 0);
    RedoBufferInfo lbuf;
    RedoBufferInfo rbuf;
    Page rpage;
    UBTPageOpaqueInternal ropaque;
    char *datapos = NULL;
    Size datalen;
    Item left_hikey = NULL;
    Size left_hikeysz = 0;
    RelFileNode rnode;
    BlockNumber leftsib = InvalidBlockNumber;
    BlockNumber rightsib = InvalidBlockNumber;
    BlockNumber rnext;

    XLogRecGetBlockTag(record, 0, &rnode, NULL, &leftsib);
    XLogRecGetBlockTag(record, 1, NULL, NULL, &rightsib);
    if (!XLogRecGetBlockTag(record, 2, NULL, NULL, &rnext)) {
        rnext = P_NONE;
    }

    /* Forget any split this insertion completes */
    if (!isleaf) {
        BlockNumber downlink;
        /* we assume SizeOfBtreeSplit is at least 16-bit aligned */
        datapos = (char *)xlrec + SizeOfBtreeSplit;
        downlink = BlockIdGetBlockNumber((BlockId)datapos);
        ForgetMatchingSplit(&rnode, downlink, false);
    }

    /* Reconstruct right (new) sibling page from scratch */
    XLogInitBufferForRedo(record, 1, &rbuf);
    datapos = XLogRecGetBlockData(record, 1, &datalen);
    rpage = rbuf.pageinfo.page;

    UBTreePageInit(rpage, rbuf.pageinfo.pagesize);
    ropaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(rpage);

    ropaque->btpo_prev = leftsib;
    ropaque->btpo_next = rnext;
    ropaque->btpo.level = xlrec->level;
    ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
    ropaque->btpo_cycleid = 0;

    UBTreeRestorePage(rpage, datapos, (int)datalen);

    /*
     * On leaf level, the high key of the left page is equal to the first key
     * on the right page.
     */
    if (isleaf) {
        ItemId hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));

        left_hikey = PageGetItem(rpage, hiItemId);
        left_hikeysz = ItemIdGetLength(hiItemId);
    }

    PageSetLSN(rpage, lsn);
    MarkBufferDirty(rbuf.buf);
    /* don't release the buffer yet; we touch right page's first item below
     * Now reconstruct left (original) sibling page
     */
    if (XLogReadBufferForRedo(record, 0, &lbuf) == BLK_NEEDS_REDO) {
        /*
         * To retain the same physical order of the tuples that they had, we
         * initialize a temporary empty page for the left page and add all the
         * items to that in item number order.  This mirrors how _bt_split()
         * works.  It's not strictly required to retain the same physical
         * order, as long as the items are in the correct item number order,
         * but it helps debugging.  See also BtreeRestorePage(), which does
         * the same for the right page.
         */
        Page lpage = lbuf.pageinfo.page;
        UBTPageOpaqueInternal lopaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(lpage);
        OffsetNumber off;
        Item newitem = NULL;
        Size newitemsz = 0;
        Page newlpage;
        OffsetNumber leftoff;

        datapos = XLogRecGetBlockData(record, 0, &datalen);

        /* Extract left hikey and its size (assuming 16-bit alignment) */

        if (!isleaf) {
            left_hikey = (Item)datapos;
            left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
            datapos += left_hikeysz;
            datalen -= left_hikeysz;
        }

        if (onleft) {
            newitem = (Item)datapos;
            newitemsz = MAXALIGN(IndexTupleSize(newitem));
            datapos += newitemsz;
            datalen -= newitemsz;
        }

        Assert(datalen == 0);

        /* assure that memory is properly allocated, prevent from core dump caused by buffer unpin */
        START_CRIT_SECTION();
        newlpage = PageGetTempPageCopySpecial(lpage);
        END_CRIT_SECTION();

        /* Set high key */
        leftoff = P_HIKEY;
        if (PageAddItem(newlpage, left_hikey, left_hikeysz, P_HIKEY, false, false) == InvalidOffsetNumber)
            ereport(PANIC, (errmsg("failed to add high key to left page after split")));
        leftoff = OffsetNumberNext(leftoff);

        if (xlrec->firstright > MaxIndexTuplesPerPage) {
            ereport(ERROR, (errmodule(MOD_REDO), errmsg("Exceeded the maximum number of tuples on the page")));
        }

        for (off = P_FIRSTDATAKEY(lopaque); off < xlrec->firstright; off++) {
            ItemId itemid;
            Size itemsz;
            Item item;

            /* add the new item if it was inserted on left page */
            if (onleft && off == xlrec->newitemoff) {
                if (PageAddItem(newlpage, newitem, newitemsz, leftoff, false, false) == InvalidOffsetNumber)
                    ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
                            errmsg("failed to add new item to left page after split")));
                leftoff = OffsetNumberNext(leftoff);
            }

            itemid = PageGetItemId(lpage, off);
            itemsz = ItemIdGetLength(itemid);
            item = PageGetItem(lpage, itemid);
            if (PageAddItem(newlpage, item, itemsz, leftoff, false, false) == InvalidOffsetNumber)
                ereport(ERROR,
                        (errcode(ERRCODE_INDEX_CORRUPTED), errmsg("failed to add old item to left page after split")));
            leftoff = OffsetNumberNext(leftoff);
        }

        /* cope with possibility that newitem goes at the end */
        if (onleft && off == xlrec->newitemoff) {
            if (PageAddItem(newlpage, newitem, newitemsz, leftoff, false, false) == InvalidOffsetNumber)
                ereport(ERROR,
                        (errcode(ERRCODE_INDEX_CORRUPTED), errmsg("failed to add new item to left page after split")));
            leftoff = OffsetNumberNext(leftoff);
        }

        PageRestoreTempPage(newlpage, lpage);

        /* Fix opaque fields */
        lopaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(lpage);
        lopaque->btpo_flags = isleaf ? BTP_LEAF : 0;
        lopaque->btpo_next = rightsib;
        lopaque->btpo_cycleid = 0;

        PageSetLSN(lpage, lsn);
        MarkBufferDirty(lbuf.buf);
    }

    /* We no longer need the buffers */
    if (BufferIsValid(lbuf.buf)) {
        UnlockReleaseBuffer(lbuf.buf);
    }
    UnlockReleaseBuffer(rbuf.buf);

    /*
     * Fix left-link of the page to the right of the new right sibling.
     *
     * Note: in normal operation, we do this while still holding lock on the
     * two split pages.  However, that's not necessary for correctness in WAL
     * replay, because no other index update can be in progress, and readers
     * will cope properly when following an obsolete left-link.
     */
    if (rnext != P_NONE) {
        RedoBufferInfo buffer;

        if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) {
            Page page = buffer.pageinfo.page;
            UBTPageOpaqueInternal pageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);

            pageop->btpo_prev = rightsib;

            PageSetLSN(page, lsn);
            MarkBufferDirty(buffer.buf);
        }
        if (BufferIsValid(buffer.buf)) {
            UnlockReleaseBuffer(buffer.buf);
        }
    }

    LogIncompleteSplit(&rnode, leftsib, rightsib, isroot);
}

static void UBTreeXlogVacuum(XLogReaderState *record)
{
    xl_btree_vacuum *xlrec = (xl_btree_vacuum *)XLogRecGetData(record);
    RedoBufferInfo redobuf;

    /*
     * If queries might be active then we need to ensure every leaf page is
     * unpinned between the lastBlockVacuumed and the current block, if there
     * are any.  This prevents replay of the VACUUM from reaching the stage of
     * removing heap tuples while there could still be indexscans "in flight"
     * to those particular tuples (see nbtree/README).
     *
     * It might be worth checking if there are actually any backends running;
     * if not, we could just skip this.
     *
     * Since VACUUM can visit leaf pages out-of-order, it might issue records
     * with lastBlockVacuumed >= block; that's not an error, it just means
     * nothing to do now.
     *
     * Note: since we touch all pages in the range, we will lock non-leaf
     * pages, and also any empty (all-zero) pages that may be in the index. It
     * doesn't seem worth the complexity to avoid that.  But it's important
     * that HotStandbyActiveInReplay() will not return true if the database
     * isn't yet consistent; so we need not fear reading still-corrupt blocks
     * here during crash recovery.
     */
    if (HotStandbyActive() && (g_instance.role == VSINGLENODE)) {
        RelFileNode thisrnode;
        BlockNumber thisblkno;
        BlockNumber blkno;

        XLogRecGetBlockTag(record, BTREE_VACUUM_ORIG_BLOCK_NUM, &thisrnode, NULL, &thisblkno);

        for (blkno = xlrec->lastBlockVacuumed + 1; blkno < thisblkno; blkno++) {
            /*
             * We use RBM_NORMAL_NO_LOG mode because it's not an error
             * condition to see all-zero pages.  The original btvacuumpage
             * scan would have skipped over all-zero pages, noting them in FSM
             * but not bothering to initialize them just yet; so we mustn't
             * throw an error here.  (We could skip acquiring the cleanup lock
             * if PageIsNew, but it's probably not worth the cycles to test.)
             *
             * XXX we don't actually need to read the block, we just need to
             * confirm it is unpinned. If we had a special call into the
             * buffer manager we could optimise this so that if the block is
             * not in shared_buffers we confirm it as unpinned.
             */
            Buffer buffer = XLogReadBufferExtended(thisrnode, MAIN_FORKNUM, blkno, RBM_NORMAL_NO_LOG, NULL);
            if (BufferIsValid(buffer)) {
                LockBufferForCleanup(buffer);
                UnlockReleaseBuffer(buffer);
            }
        }
    }

    /*
     * Like in btvacuumpage(), we need to take a cleanup lock on every leaf
     * page. See nbtree/README for details.
     */
    if (XLogReadBufferForRedoExtended(record, BTREE_VACUUM_ORIG_BLOCK_NUM, RBM_NORMAL, true, &redobuf) ==
        BLK_NEEDS_REDO) {
        char *ptr = NULL;
        Size len;

        ptr = XLogRecGetBlockData(record, BTREE_VACUUM_ORIG_BLOCK_NUM, &len);
        UBTreeXlogVacuumOperatorPage(&redobuf, (void *)xlrec, (void *)ptr, len);
        MarkBufferDirty(redobuf.buf);
    }
    if (BufferIsValid(redobuf.buf))
        UnlockReleaseBuffer(redobuf.buf);
}

static void UBTreeXlogDelete(XLogReaderState *record)
{
    RedoBufferInfo buffer;

    /*
     * If we have any conflict processing to do, it must happen before we
     * update the page.
     *
     * Btree delete records can conflict with standby queries.  You might
     * think that vacuum records would conflict as well, but we've handled
     * that already.  XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
     * cleaned by the vacuum of the heap and so we can resolve any conflicts
     * just once when that arrives.  After that we know that no conflicts
     * exist from individual btree vacuum records on that index.
     *
     * XXX: In MPPDB, we don't support hot_standby query on standby.
     */
    if (XLogReadBufferForRedo(record, BTREE_DELETE_ORIG_BLOCK_NUM, &buffer) == BLK_NEEDS_REDO) {
        UBTreeXlogDeleteOperatorPage(&buffer, (void *)XLogRecGetData(record), XLogRecGetDataLen(record));

        MarkBufferDirty(buffer.buf);
    }
    if (BufferIsValid(buffer.buf)) {
        UnlockReleaseBuffer(buffer.buf);
    }
}

static void UBTreeXlogDeletePage(uint8 info, XLogReaderState *record)
{
    XLogRecPtr lsn = record->EndRecPtr;
    xl_btree_delete_page *xlrec = (xl_btree_delete_page *)XLogRecGetData(record);
    RelFileNode rnode;
    BlockNumber parent = InvalidBlockNumber;
    BlockNumber target = InvalidBlockNumber;
    BlockNumber leftsib = InvalidBlockNumber;
    BlockNumber rightsib = InvalidBlockNumber;
    RedoBufferInfo buffer;
    Page page;
    UBTPageOpaqueInternal pageop;

    leftsib = xlrec->leftblk;
    rightsib = xlrec->rightblk;

    XLogRecGetBlockTag(record, 0, &rnode, NULL, &target);
    XLogRecGetBlockTag(record, 3, NULL, NULL, &parent);

    /*
     * In normal operation, we would lock all the pages this WAL record
     * touches before changing any of them.  In WAL replay, it should be okay
     * to lock just one page at a time, since no concurrent index updates can
     * be happening, and readers should not care whether they arrive at the
     * target page or not (since it's surely empty).
     *
     * parent page
     */
    if (XLogReadBufferForRedo(record, 3, &buffer) == BLK_NEEDS_REDO) {
        OffsetNumber poffset, maxoff;

        page = buffer.pageinfo.page;

        pageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
        poffset = xlrec->poffset;
        maxoff = PageGetMaxOffsetNumber(page);
        if (poffset >= maxoff) {
            Assert(info == XLOG_UBTREE_MARK_PAGE_HALFDEAD);
            Assert(poffset == P_FIRSTDATAKEY(pageop));
            PageIndexTupleDelete(page, poffset);
            pageop->btpo_flags |= BTP_HALF_DEAD;
        } else {
            ItemId itemid;
            IndexTuple itup;
            OffsetNumber nextoffset;

            Assert(info != XLOG_UBTREE_MARK_PAGE_HALFDEAD);
            itemid = PageGetItemId(page, poffset);
            itup = (IndexTuple)PageGetItem(page, itemid);
            ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
            nextoffset = OffsetNumberNext(poffset);
            PageIndexTupleDelete(page, nextoffset);
        }

        PageSetLSN(page, lsn);
        MarkBufferDirty(buffer.buf);
    }
    if (BufferIsValid(buffer.buf))
        UnlockReleaseBuffer(buffer.buf);

    /* Fix left-link of right sibling */
    if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) {
        page = buffer.pageinfo.page;

        pageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
        pageop->btpo_prev = leftsib;

        PageSetLSN(page, lsn);
        MarkBufferDirty(buffer.buf);
    }
    if (BufferIsValid(buffer.buf))
        UnlockReleaseBuffer(buffer.buf);

    /* Fix right-link of left sibling, if any */
    if (leftsib != P_NONE) {
        if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO) {
            page = buffer.pageinfo.page;

            pageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
            pageop->btpo_next = rightsib;

            PageSetLSN(page, lsn);
            MarkBufferDirty(buffer.buf);
        }
        if (BufferIsValid(buffer.buf))
            UnlockReleaseBuffer(buffer.buf);
    }

    /* Rewrite target page as empty deleted page */
    XLogInitBufferForRedo(record, 0, &buffer);
    page = buffer.pageinfo.page;

    UBTreePageInit(page, buffer.pageinfo.pagesize);
    pageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);

    pageop->btpo_prev = leftsib;
    pageop->btpo_next = rightsib;
    pageop->btpo_flags = BTP_DELETED;
    pageop->btpo_cycleid = 0;
    ((UBTPageOpaque)pageop)->xact = xlrec->btpo_xact;

    PageSetLSN(page, lsn);
    MarkBufferDirty(buffer.buf);
    UnlockReleaseBuffer(buffer.buf);

    /* Update metapage if needed */
    if (info == XLOG_UBTREE_UNLINK_PAGE_META)
        UBTreeRestoreMeta(record, 4);

    /* Forget any completed deletion */
    ForgetMatchingDeletion(&rnode, target);

    /* If parent became half-dead, remember it for deletion */
    if (info == XLOG_UBTREE_MARK_PAGE_HALFDEAD) {
        LogIncompleteDeletion(&rnode, parent);
    }
}

static void UBTreeXlogMarkPageHalfDead(uint8 info, XLogReaderState *record)
{
    xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *)XLogRecGetData(record);
    RedoBufferInfo pbuffer;

    /*
     * In normal operation, we would lock all the pages this WAL record
     * touches before changing any of them.  In WAL replay, it should be okay
     * to lock just one page at a time, since no concurrent index updates can
     * be happening, and readers should not care whether they arrive at the
     * target page or not (since it's surely empty).
     */

    /* parent page */
    if (XLogReadBufferForRedo(record, BTREE_HALF_DEAD_PARENT_PAGE_NUM, &pbuffer) == BLK_NEEDS_REDO) {
        UBTreeXlogHalfdeadPageOperatorParentpage(&pbuffer, xlrec);
        MarkBufferDirty(pbuffer.buf);
    }
    if (BufferIsValid(pbuffer.buf)) {
        UnlockReleaseBuffer(pbuffer.buf);
    }

    RedoBufferInfo lbuffer;
    XLogInitBufferForRedo(record, BTREE_HALF_DEAD_LEAF_PAGE_NUM, &lbuffer);
    UBTreeXlogHalfdeadPageOperatorLeafpage(&lbuffer, xlrec);

    MarkBufferDirty(lbuffer.buf);
    UnlockReleaseBuffer(lbuffer.buf);
}

static void UBTreeXlogUnlinkPage(uint8 info, XLogReaderState *record)
{
    xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *)XLogRecGetData(record);
    BlockNumber leftsib;
    BlockNumber rightsib;

    leftsib = xlrec->leftsib;
    rightsib = xlrec->rightsib;

    /*
     * In normal operation, we would lock all the pages this WAL record
     * touches before changing any of them.  In WAL replay, it should be okay
     * to lock just one page at a time, since no concurrent index updates can
     * be happening, and readers should not care whether they arrive at the
     * target page or not (since it's surely empty).
     */

    /* Fix left-link of right sibling */
    RedoBufferInfo rbuffer;
    if (XLogReadBufferForRedo(record, BTREE_UNLINK_PAGE_RIGHT_NUM, &rbuffer) == BLK_NEEDS_REDO) {
        UBTreeXlogUnlinkPageOperatorRightpage(&rbuffer, xlrec);
        MarkBufferDirty(rbuffer.buf);
    }
    if (BufferIsValid(rbuffer.buf)) {
        UnlockReleaseBuffer(rbuffer.buf);
    }

    /* Fix right-link of left sibling, if any */
    if (leftsib != P_NONE) {
        RedoBufferInfo lbuffer;
        if (XLogReadBufferForRedo(record, BTREE_UNLINK_PAGE_LEFT_NUM, &lbuffer) == BLK_NEEDS_REDO) {
            UBTreeXlogUnlinkPageOperatorLeftpage(&lbuffer, xlrec);
            MarkBufferDirty(lbuffer.buf);
        }
        if (BufferIsValid(lbuffer.buf)) {
            UnlockReleaseBuffer(lbuffer.buf);
        }
    }

    /* Rewrite target page as empty deleted page */
    RedoBufferInfo buffer;
    XLogInitBufferForRedo(record, BTREE_UNLINK_PAGE_CUR_PAGE_NUM, &buffer);
    UBTreeXlogUnlinkPageOperatorCurpage(&buffer, xlrec);

    MarkBufferDirty(buffer.buf);
    UnlockReleaseBuffer(buffer.buf);

    /*
     * If we deleted a parent of the targeted leaf page, instead of the leaf
     * itself, update the leaf to point to the next remaining child in the
     * branch.
     */
    if (XLogRecHasBlockRef(record, BTREE_UNLINK_PAGE_CHILD_NUM)) {
        /*
         * There is no real data on the page, so we just re-create it from
         * scratch using the information from the WAL record.
         */
        RedoBufferInfo cbuffer;
        XLogInitBufferForRedo(record, BTREE_UNLINK_PAGE_CHILD_NUM, &cbuffer);
        UBTreeXlogUnlinkPageOperatorChildpage(&cbuffer, xlrec);

        MarkBufferDirty(cbuffer.buf);
        UnlockReleaseBuffer(cbuffer.buf);
    }

    /* Update metapage if needed */
    if (info == XLOG_UBTREE_UNLINK_PAGE_META) {
        UBTreeRestoreMeta(record, BTREE_UNLINK_PAGE_META_NUM);
    }
}

static void UBTreeXlogNewRootUpdate(XLogReaderState *record)
{
    xl_btree_newroot *xlrec = (xl_btree_newroot *)XLogRecGetData(record);
    BlockNumber downlink = 0;
    RedoBufferInfo buffer;
    RedoBufferInfo lbuffer;
    char *ptr = NULL;
    Size len;

    XLogInitBufferForRedo(record, BTREE_NEWROOT_ORIG_BLOCK_NUM, &buffer);
    ptr = XLogRecGetBlockData(record, BTREE_NEWROOT_ORIG_BLOCK_NUM, &len);
    UBTreeXlogNewrootOperatorPage(&buffer, (void *)xlrec, (void *)ptr, len, &downlink);

    MarkBufferDirty(buffer.buf);
    UnlockReleaseBuffer(buffer.buf);

    lbuffer.buf = InvalidBuffer;
    if (xlrec->level > 0 && XLogReadBufferForRedo(record, BTREE_NEWROOT_LEFT_BLOCK_NUM, &lbuffer) == BLK_NEEDS_REDO) {
        UBTreeXlogClearIncompleteSplit<UBTPageOpaqueInternal>(&lbuffer);
        MarkBufferDirty(lbuffer.buf);
    }

    if (BufferIsValid(lbuffer.buf)) {
        UnlockReleaseBuffer(lbuffer.buf);
    }
    UBTreeRestoreMeta(record, BTREE_NEWROOT_META_BLOCK_NUM);
}

static void UBTreeXlogNewRoot(XLogReaderState *record, bool issplitupgrade)
{
    if (issplitupgrade) {
        UBTreeXlogNewRootUpdate(record);
        return;
    }

    XLogRecPtr lsn = record->EndRecPtr;
    xl_btree_newroot *xlrec = (xl_btree_newroot *)XLogRecGetData(record);
    RelFileNode rnode;
    Page page;
    UBTPageOpaqueInternal pageop;
    BlockNumber downlink = 0;
    RedoBufferInfo buffer;
    char *ptr = NULL;
    Size len;

    XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);

    XLogInitBufferForRedo(record, 0, &buffer);
    page = buffer.pageinfo.page;

    UBTreePageInit(page, buffer.pageinfo.pagesize);
    pageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
    pageop->btpo_flags = BTP_ROOT;
    pageop->btpo_prev = pageop->btpo_next = P_NONE;
    pageop->btpo.level = xlrec->level;
    if (xlrec->level == 0) {
        pageop->btpo_flags |= BTP_LEAF;
    }
    pageop->btpo_cycleid = 0;

    if (xlrec->level > 0) {
        IndexTuple itup;

        ptr = XLogRecGetBlockData(record, 0, &len);
        UBTreeRestorePage(page, ptr, len);

        /* extract downlink to the right-hand split page */
        itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, P_FIRSTKEY));
        downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
        Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
    }

    PageSetLSN(page, lsn);
    MarkBufferDirty(buffer.buf);
    UnlockReleaseBuffer(buffer.buf);

    /* Check to see if this satisfies any incomplete insertions */
    if (xlrec->level > 0) {
        ForgetMatchingSplit(&rnode, downlink, true);
    }
    UBTreeRestoreMeta(record, 1);
}

static UndoRecPtr PrepareUndoRecordForRedo(XLogReaderState *record,
    xl_ubtree3_insert_or_delete *xlrec, IndexTuple itup, char *recordPtr, const bool tryPrepare, bool isInsert)
{
    XLogRecPtr lsn = record->EndRecPtr;
    UndoRecPtr blkprev = INVALID_UNDO_REC_PTR;
    UndoRecPtr prevurp = INVALID_UNDO_REC_PTR;
    Oid partitionOid = InvalidOid;
    undo::XlogUndoMeta undometa;
    XlUndoHeader *xlundohdr = (XlUndoHeader *)recordPtr;
    recordPtr += SizeOfXLUndoHeader;
    UBTreeUndoInfo uinfo = (UBTreeUndoInfo)recordPtr;
    recordPtr += SizeOfUBTreeUndoInfoData;

    if ((xlundohdr->flag & UBTREE_XLOG_HAS_BLK_PREV) != 0) {
        blkprev = *((UndoRecPtr *)recordPtr);
        recordPtr += sizeof(UndoRecPtr);
    }
    if ((xlundohdr->flag & UBTREE_XLOG_HAS_XACT_PREV) != 0) {
        prevurp = *((UndoRecPtr *)recordPtr);
        recordPtr += sizeof(UndoRecPtr);
    }
    if ((xlundohdr->flag & UBTREE_XLOG_HAS_PARTITION_OID) != 0) {
        partitionOid = *((Oid *)recordPtr);
        recordPtr += sizeof(Oid);
    }

    undo::XlogUndoMeta *xlundometa = (undo::XlogUndoMeta *)((char *)recordPtr);
    UndoRecPtr urecptr = xlundohdr->urecptr;

    /* copy xlundometa to local struct */
    CopyUndoMeta(*xlundometa, undometa);
    recordPtr += undometa.Size();

    if (tryPrepare) {
        bool skipInsert = undo::IsSkipInsertUndo(urecptr);
        if (skipInsert) {
            undometa.SetInfo(XLOG_UNDOMETA_INFO_SKIP);
        }

        /* recover undo record */
        UndoRecord *undorec = (*t_thrd.ustore_cxt.urecvec)[0];
        undorec->SetUrp(urecptr);
        undorec->SetOffset(xlrec->offNum);
        undorec->SetBlkprev(blkprev);
        undorec->SetOldXactId(xlrec->prevXidOfTuple);

        /* We need to pass in tablespace and relfilenode in PrepareUndo but we never explicitly
         * wrote those information in the xlundohdr because we can grab them from the XLOG record itself.
         */
        RelFileNode targetNode = { 0 };
        BlockNumber blkno = InvalidBlockNumber;
        bool res PG_USED_FOR_ASSERTS_ONLY = XLogRecGetBlockTag(record, 0, &targetNode, NULL, &blkno);
        Assert(res);

        if (isInsert) {
            urecptr = UBTreePCRPrepareUndoInsert(xlundohdr->relOid, partitionOid, targetNode.relNode,
                targetNode.spcNode, UNDO_PERMANENT, xlrec->curXid, FirstCommandId,
                blkprev, prevurp, blkno, xlundohdr, &undometa, InvalidOffsetNumber, InvalidBuffer,
                xlrec->prevXidOfTuple, uinfo, itup);
        } else {
            urecptr = UBTreePCRPrepareUndoDelete(xlundohdr->relOid, partitionOid, targetNode.relNode,
                targetNode.spcNode, UNDO_PERMANENT, InvalidBuffer, xlrec->curXid, 0, prevurp,
                itup, blkno, xlundohdr, &undometa, uinfo);
        }

        Assert(UNDO_PTR_GET_OFFSET(urecptr) == UNDO_PTR_GET_OFFSET(xlundohdr->urecptr));
        if (!skipInsert) {
            /* Insert the Undo record into the undo store */
            InsertPreparedUndo(t_thrd.ustore_cxt.urecvec, lsn);
        }
        undo::RedoUndoMeta(record, &undometa, xlundohdr->urecptr, t_thrd.ustore_cxt.urecvec->LastRecord(),
            t_thrd.ustore_cxt.urecvec->LastRecordSize());
        UndoRecordVerify(undorec);
        UHeapResetPreparedUndo();
    }

    return urecptr;
}

bool IsUBTreeVacuum(const XLogReaderState *record)
{
    uint8 info = (XLogRecGetInfo(record) & (~XLR_INFO_MASK));

    if (XLogRecGetRmid(record) == RM_UBTREE_ID) {
        if ((info == XLOG_UBTREE_REUSE_PAGE) || (info == XLOG_UBTREE_VACUUM) || (info == XLOG_UBTREE_DELETE) ||
            (info == XLOG_UBTREE_UNLINK_PAGE) || (info == XLOG_UBTREE_UNLINK_PAGE_META) ||
            (info == XLOG_UBTREE_MARK_PAGE_HALFDEAD)) {
            return true;
        }
    }

    return false;
}

static void UBTreeXlogReusePage(XLogReaderState *record)
{
    xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *)XLogRecGetData(record);

    /*
     * Btree reuse_page records exist to provide a conflict point when we
     * reuse pages in the index via the FSM.  That's all they do though.
     *
     * latestRemovedXid was the page's btpo.xact.  The btpo.xact <
     * RecentGlobalXmin test in _bt_page_recyclable() conceptually mirrors the
     * pgxact->xmin > limitXmin test in GetConflictingVirtualXIDs().
     * Consequently, one XID value achieves the same exclusion effect on
     * master and standby.
     */
    RelFileNode tmp_node;
    RelFileNodeCopy(tmp_node, xlrec->node, XLogRecGetBucketId(record));

    if (InHotStandby && g_supportHotStandby && !IS_EXRTO_READ) {
        XLogRecPtr lsn = record->ReadRecPtr;
        ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, tmp_node, lsn);
    }
}

static void UBTreeXlogMarkDelete(XLogReaderState* record)
{
    RedoBufferInfo buffer;
    buffer.buf = InvalidBuffer;
    if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) {
        UBTreeXlogMarkDeleteOperatorPage(&buffer, XLogRecGetData(record));
        MarkBufferDirty(buffer.buf);
    }
    if (BufferIsValid(buffer.buf)) {
        UnlockReleaseBuffer(buffer.buf);
    }
}

static void UBTreeXlogPrunePage(XLogReaderState* record)
{
    RedoBufferInfo buffer;
    buffer.buf = InvalidBuffer;
    RelFileNode rnode;
    xl_ubtree_prune_page *xlrec = (xl_ubtree_prune_page*)XLogRecGetData(record);

    if (!XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL)) {
        /* Caller specified a bogus block_id */
        ereport(PANIC, (errmsg("failed to locate backup block with ID %d", 0)));
    }
    if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid) && !IS_EXRTO_READ) {
        XLogRecPtr lsn = record->ReadRecPtr;
        ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode, lsn);
    }

    if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) {
        UBTreeXlogPrunePageOperatorPage(&buffer, XLogRecGetData(record));
        MarkBufferDirty(buffer.buf);
    }

    if (BufferIsValid(buffer.buf)) {
        UnlockReleaseBuffer(buffer.buf);
    }
}

static void UBTree2XlogShiftBase(XLogReaderState* record)
{
    RedoBufferInfo buffer;
    buffer.buf = InvalidBuffer;
    if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) {
        UBTree2XlogShiftBaseOperatorPage(&buffer, XLogRecGetData(record));
        MarkBufferDirty(buffer.buf);
    }
    if (BufferIsValid(buffer.buf)) {
        UnlockReleaseBuffer(buffer.buf);
    }
}

static void UBTree2XlogRecycleQueueInitPage(XLogReaderState *record)
{
    xl_ubtree2_recycle_queue_init_page *xlrec = (xl_ubtree2_recycle_queue_init_page *)XLogRecGetData(record);

    RedoBufferInfo buf;
    XLogInitBufferForRedo(record, UBTREE2_RECYCLE_QUEUE_INIT_PAGE_CURR_BLOCK_NUM, &buf);

    UBTree2XlogRecycleQueueInitPageOperatorCurrPage(&buf, (void *)xlrec);
    MarkBufferDirty(buf.buf);

    if (xlrec->insertingNewPage) {
        RedoBufferInfo lbuf;
        if (XLogReadBufferForRedo(record, UBTREE2_RECYCLE_QUEUE_INIT_PAGE_LEFT_BLOCK_NUM, &lbuf) == BLK_NEEDS_REDO) {
            UBTree2XlogRecycleQueueInitPageOperatorAdjacentPage(&lbuf, (void *)xlrec, true);
            MarkBufferDirty(lbuf.buf);
        }
        RedoBufferInfo rbuf;
        if (XLogReadBufferForRedo(record, UBTREE2_RECYCLE_QUEUE_INIT_PAGE_RIGHT_BLOCK_NUM, &rbuf) == BLK_NEEDS_REDO) {
            UBTree2XlogRecycleQueueInitPageOperatorAdjacentPage(&rbuf, (void *)xlrec, false);
            MarkBufferDirty(rbuf.buf);
        }
        if (BufferIsValid(lbuf.buf)) {
            UnlockReleaseBuffer(lbuf.buf);
        }
        if (BufferIsValid(rbuf.buf)) {
            UnlockReleaseBuffer(rbuf.buf);
        }
    }
    UnlockReleaseBuffer(buf.buf);
}

static void UBTree2XlogRecycleQueueEndpoint(XLogReaderState *record)
{
    RedoBufferInfo lbuf;
    if (XLogReadBufferForRedo(record, UBTREE2_RECYCLE_QUEUE_ENDPOINT_CURR_BLOCK_NUM, &lbuf) == BLK_NEEDS_REDO) {
        UBTree2XlogRecycleQueueEndpointOperatorLeftPage(&lbuf, (void *)XLogRecGetData(record));
        MarkBufferDirty(lbuf.buf);
    }
    RedoBufferInfo rbuf;
    if (XLogReadBufferForRedo(record, UBTREE2_RECYCLE_QUEUE_ENDPOINT_NEXT_BLOCK_NUM, &rbuf) == BLK_NEEDS_REDO) {
        UBTree2XlogRecycleQueueEndpointOperatorRightPage(&rbuf, (void *)XLogRecGetData(record));
        MarkBufferDirty(rbuf.buf);
    }
    if (BufferIsValid(lbuf.buf)) {
        UnlockReleaseBuffer(lbuf.buf);
    }
    if (BufferIsValid(rbuf.buf)) {
        UnlockReleaseBuffer(rbuf.buf);
    }
}

static void UBTree2XlogRecycleQueueModify(XLogReaderState *record)
{
    RedoBufferInfo buf;
    if (XLogReadBufferForRedo(record, UBTREE2_RECYCLE_QUEUE_MODIFY_BLOCK_NUM, &buf) == BLK_NEEDS_REDO) {
        UBTree2XlogRecycleQueueModifyOperatorPage(&buf, (void *)XLogRecGetData(record));
        MarkBufferDirty(buf.buf);
    }
    if (BufferIsValid(buf.buf)) {
        UnlockReleaseBuffer(buf.buf);
    }
}

static void UBTree2XlogFreeze(XLogReaderState *record)
{
    RedoBufferInfo buf;
    if (XLogReadBufferForRedo(record, UBTREE2_FREEZE_BLOCK_NUM, &buf) == BLK_NEEDS_REDO) {
        UBTree2XlogFreezeOperatorPage(&buf, (void *)XLogRecGetData(record));
        MarkBufferDirty(buf.buf);
    }
    if (BufferIsValid(buf.buf)) {
        UnlockReleaseBuffer(buf.buf);
    }
}

void UBTreeRedo(XLogReaderState* record)
{
    uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
    bool hasOpaque = ((XLogRecGetInfo(record) & BTREE_SPLIT_OPAQUE_FLAG) != 0);
    bool issplitupgrade = true;
    bool isdelupgrade = true;

    switch (info) {
        case XLOG_UBTREE_INSERT_LEAF:
            UBTreeXlogInsert(true, false, record, issplitupgrade);
            break;
        case XLOG_UBTREE_INSERT_UPPER:
            UBTreeXlogInsert(false, false, record, issplitupgrade);
            break;
        case XLOG_UBTREE_INSERT_META:
            UBTreeXlogInsert(false, true, record, issplitupgrade);
            break;
        case XLOG_UBTREE_SPLIT_L:
            UBTreeXlogSplit(true, false, record, issplitupgrade, hasOpaque);
            break;
        case XLOG_UBTREE_SPLIT_R:
            UBTreeXlogSplit(false, false, record, issplitupgrade, hasOpaque);
            break;
        case XLOG_UBTREE_SPLIT_L_ROOT:
            UBTreeXlogSplit(true, true, record, issplitupgrade, hasOpaque);
            break;
        case XLOG_UBTREE_SPLIT_R_ROOT:
            UBTreeXlogSplit(false, true, record, issplitupgrade, hasOpaque);
            break;
        case XLOG_UBTREE_VACUUM:
            UBTreeXlogVacuum(record);
            break;
        case XLOG_UBTREE_DELETE:
            UBTreeXlogDelete(record);
            break;
        case XLOG_UBTREE_UNLINK_PAGE:
        case XLOG_UBTREE_UNLINK_PAGE_META:
            if (!isdelupgrade) {
                UBTreeXlogDeletePage(info, record);
            } else {
                UBTreeXlogUnlinkPage(info, record);
            }
            break;
        case XLOG_UBTREE_MARK_PAGE_HALFDEAD:
            if (!isdelupgrade) {
                UBTreeXlogDeletePage(info, record);
            } else {
                UBTreeXlogMarkPageHalfDead(info, record);
            }
            break;
        case XLOG_UBTREE_NEWROOT:
            UBTreeXlogNewRoot(record, issplitupgrade);
            break;
        case XLOG_UBTREE_REUSE_PAGE:
            UBTreeXlogReusePage(record);
            break;
        case XLOG_UBTREE_MARK_DELETE:
            UBTreeXlogMarkDelete(record);
            break;
        case XLOG_UBTREE_PRUNE_PAGE:
            UBTreeXlogPrunePage(record);
            break;
        default:
            ereport(PANIC, (errmsg("UBTreeRedo: unknown op code %hhu", info)));
    }
}

static void UBTree3RestoreMeta(XLogReaderState *record, uint8 block_id)
{
    RedoBufferInfo metabuf;
    char *ptr = NULL;
    Size len;

    XLogInitBufferForRedo(record, block_id, &metabuf);
    ptr = XLogRecGetBlockData(record, block_id, &len);

    UBTree3RestoreMetaOperatorPage(&metabuf, (void *)ptr, len);
    if (BufferIsValid(metabuf.buf)) {
        MarkBufferDirty(metabuf.buf);
        UnlockReleaseBuffer(metabuf.buf);
    }
}

static void UBTree3XlogInsertPcrInternal(XLogReaderState* record, bool hasMeta)
{
    RedoBufferInfo lbuf;
    RedoBufferInfo cbuf;

    if (XLogReadBufferForRedo(record, UBTREE3_INSERT_PCR_LEAF_BLOCK_NUM, &cbuf) == BLK_NEEDS_REDO) {
        UBTreeXlogClearIncompleteSplit<UBTPCRPageOpaque>(&cbuf);
        if (BufferIsValid(cbuf.buf)) {
            MarkBufferDirty(cbuf.buf);
        }
    }
    if (BufferIsValid(cbuf.buf)) {
        UnlockReleaseBuffer(cbuf.buf);
    }

    if (XLogReadBufferForRedo(record, UBTREE3_INSERT_PCR_INTERNAL_BLOCK_NUM, &lbuf) == BLK_NEEDS_REDO) {
        Size dataLen;
        char *dataPos = XLogRecGetBlockData(record, UBTREE3_INSERT_PCR_INTERNAL_BLOCK_NUM, &dataLen);
        xl_btree_insert *xlrec = (xl_btree_insert *)XLogRecGetData(record);
        Assert(xlrec->offnum != InvalidOffsetNumber);
        Page page = lbuf.pageinfo.page;

        if (UBTPCRPageAddItem(page, (Item)dataPos, dataLen, xlrec->offnum, false) == InvalidOffsetNumber) {
            ereport(PANIC, (errcode(ERRCODE_INDEX_CORRUPTED),
                    errmsg("faild to add left P_HIKEY wihle redo.")));
        }
        PageSetLSN(page, lbuf.lsn);
        if (BufferIsValid(lbuf.buf)) {
            MarkBufferDirty(lbuf.buf);
        }
    }
    if (BufferIsValid(lbuf.buf)) {
        UnlockReleaseBuffer(lbuf.buf);
    }

    if (hasMeta) {
        UBTree3RestoreMeta(record, UBTREE3_INSERT_PCR_META_BLOCK_NUM);
    }
}

static void UBTree3XlogInsert(XLogReaderState* record, bool isDup)
{
    RedoBufferInfo buffer;
    xl_ubtree3_insert_or_delete *xlrec = (xl_ubtree3_insert_or_delete *)XLogRecGetData(record);
    char *recordptr = (char *)xlrec;
    recordptr += SizeOfUbtree3InsertOrDelete;
    IndexTuple itup = (IndexTuple)recordptr;
    recordptr += IndexTupleSize(itup);
    bool replayAll = !AmPageRedoWorker() || !SUPPORT_USTORE_UNDO_WORKER;
    bool replayRedoOnly = replayAll ? false : parallel_recovery::DoPageRedoWorkerReplayUndo();
    UndoRecPtr urec = PrepareUndoRecordForRedo(record, xlrec, itup, recordptr, (replayAll || replayRedoOnly), true);
    if (replayAll || !replayRedoOnly) {
        if (XLogReadBufferForRedo(record, BTREE_INSERT_ORIG_BLOCK_NUM, &buffer) == BLK_NEEDS_REDO) {
            Page page = buffer.pageinfo.page;
            if (!isDup) {
                Size size = MAXALIGN(IndexTupleSize(itup));
                if (!UBTreePCRPageAddTuple(page, size, NULL, itup, xlrec->offNum, false, xlrec->tdId)) {
                    ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED), errmsg("failed to add new item to block")));
                }
            }
            UBTreeItemId iid = UBTreePCRGetRowPtr(page, xlrec->offNum);
            UBTPCRPageOpaque opaque = (UBTPCRPageOpaque)PageGetSpecialPointer(page);
            UBTreeTD td = UBTreePCRGetTD(page, xlrec->tdId);
            opaque->activeTupleCount ++;
            td->setInfo(xlrec->curXid, urec);
            if (isDup) {
                iid->lp_flags = LP_NORMAL;
                UBTreePCRSetIndexTupleTDSlot(iid, xlrec->tdId);
                UBTreePCRClearIndexTupleTDInvalid(iid);
                UBTreePCRClearIndexTupleDeleted(iid);
            }
            PageSetLSN(page, buffer.lsn);
            if (BufferIsValid(buffer.buf)) {
                MarkBufferDirty(buffer.buf);
            }
        }
        if (BufferIsValid(buffer.buf)) {
            UnlockReleaseBuffer(buffer.buf);
        }
    }
}

static void UBTree3XlogNewRoot(XLogReaderState* record)
{
    RedoBufferInfo buffer;
    xl_btree_newroot *xlrec = (xl_btree_newroot *)XLogRecGetData(record);
    XLogInitBufferForRedo(record, 0, &buffer);

    Size dataLen;
    char *dataPos = XLogRecGetBlockData(record, 0, &dataLen);
    Page page = buffer.pageinfo.page;
    UBTreePCRPageInit(page, buffer.pageinfo.pagesize);
    UBTPCRPageOpaque opaque = (UBTPCRPageOpaque)PageGetSpecialPointer(page);
    opaque->btpo_cycleid = 0;
    opaque->btpo.level = xlrec->level;
    opaque->btpo_flags = BTP_ROOT;
    if (xlrec->level == 0) {
        opaque->btpo_flags |= BTP_LEAF;
        UBTreePCRInitTD(page);
    } else {
        opaque->td_count = 0;
        Item lItem = (Item)dataPos;
        Size lItemSz = IndexTupleSize(lItem);
        Item rItem = (Item)(dataPos + lItemSz);
        Size rItemSz = IndexTupleSize(rItem);

        if (UBTPCRPageAddItem(page, lItem, lItemSz, P_HIKEY, false) == InvalidOffsetNumber) {
            ereport(PANIC, (errcode(ERRCODE_INDEX_CORRUPTED),
                errmsg("faild to add left P_HIKEY wihle redo.")));
        }
        if (UBTPCRPageAddItem(page, rItem, rItemSz, P_FIRSTKEY, false) == InvalidOffsetNumber) {
            ereport(PANIC, (errcode(ERRCODE_INDEX_CORRUPTED),
                errmsg("faild to add left P_FIRSTKEY wihle redo.")));
        }
    }
    PageSetLSN(page, buffer.lsn);

    if (BufferIsValid(buffer.buf)) {
        MarkBufferDirty(buffer.buf);
        UnlockReleaseBuffer(buffer.buf);
    }

    RedoBufferInfo lbuffer;
    lbuffer.buf = InvalidBuffer;
    if (xlrec->level > 0 && XLogReadBufferForRedo(record, 1, &lbuffer) == BLK_NEEDS_REDO) {
        UBTreeXlogClearIncompleteSplit<UBTPCRPageOpaque>(&lbuffer);
        if (BufferIsValid(lbuffer.buf)) {
            MarkBufferDirty(lbuffer.buf);
        }
    }
    if (BufferIsValid(lbuffer.buf)) {
        UnlockReleaseBuffer(lbuffer.buf);
    }
    UBTree3RestoreMeta(record, BTREE_NEWROOT_META_BLOCK_NUM);
}

static void UBTree3XlogPrunePage(XLogReaderState* record)
{
    RedoBufferInfo buffer;
    buffer.buf = InvalidBuffer;
    RelFileNode rnode;
    xl_ubtree3_prune_page *xlrec = (xl_ubtree3_prune_page*)XLogRecGetData(record);

    if (!XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL)) {
        /* Caller specified a bogus block_id */
        ereport(PANIC, (errmsg("failed to locate backup block with ID %d", 0)));
    }
    TransactionId latestConflictXid = TransactionIdFollows(xlrec->latestFrozenXid,
        xlrec->latestRemovedXid) ? xlrec->latestFrozenXid : xlrec->latestRemovedXid;
    if (InHotStandby && TransactionIdIsValid(latestConflictXid) && g_supportHotStandby)
        ResolveRecoveryConflictWithSnapshot(latestConflictXid, rnode);
    if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) {
        UBTree3XlogPrunePageOperatorPage(&buffer, XLogRecGetData(record), rnode.relNode);
        if (BufferIsValid(buffer.buf)) {
            MarkBufferDirty(buffer.buf);
        }
    }

    if (BufferIsValid(buffer.buf)) {
        UnlockReleaseBuffer(buffer.buf);
    }
}

static void UBTree3XlogDelete(XLogReaderState* record)
{
    RedoBufferInfo buffer = { 0 };
    RelFileNode targetNode;
    BlockNumber blkno = InvalidBlockNumber;
    XLogRedoAction action;
    xl_ubtree3_insert_or_delete *xlrec = (xl_ubtree3_insert_or_delete *)XLogRecGetData(record);
    XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUbtree3InsertOrDelete);
    IndexTuple itup = (IndexTuple)xlundohdr;
    char *recordPtr = (char *)xlrec + SizeOfUbtree3InsertOrDelete + IndexTupleSize(itup);

    bool allReplay = !AmPageRedoWorker() || !SUPPORT_USTORE_UNDO_WORKER;
    bool onlyReplayUndo = allReplay ? false : parallel_recovery::DoPageRedoWorkerReplayUndo();

    XLogRecGetBlockTag(record, 0, &targetNode, NULL, &blkno);

    UndoRecPtr urecptr = PrepareUndoRecordForRedo(record, xlrec, itup, recordPtr,
        (allReplay || onlyReplayUndo), false);

    if (allReplay || !onlyReplayUndo) {
        action = XLogReadBufferForRedo(record, 0, &buffer);
        if (action == BLK_NEEDS_REDO) {
            UBTree3XlogDeleteOperatorPage(&buffer, XLogRecGetData(record), urecptr);
            if (BufferIsValid(buffer.buf)) {
                MarkBufferDirty(buffer.buf);
            }
        }

        if (BufferIsValid(buffer.buf)) {
            UnlockReleaseBuffer(buffer.buf);
        }
    }
}

static void UBTree3XlogFreezeTdSlot(XLogReaderState* record)
{
    xl_ubtree3_freeze_td_slot *xlrec = (xl_ubtree3_freeze_td_slot *)XLogRecGetData(record);
    RelFileNode rnode;
    RedoBufferInfo buffer;
    if (!XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL)) {
        /* Caller specified a bogus block_id */
        ereport(PANIC, (errmsg("failed to locate backup block with ID %d", 0)));
    }

    if (InHotStandby && TransactionIdIsValid(xlrec->latestFrozenXid) && g_supportHotStandby) {
        ResolveRecoveryConflictWithSnapshot(xlrec->latestFrozenXid, rnode);
    }

    if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) {
        UBTree3XlogFreezeTdOperatorPage(&buffer, (void *)XLogRecGetData(record));
        if (BufferIsValid(buffer.buf)) {
            MarkBufferDirty(buffer.buf);
        }
    }

    if (BufferIsValid(buffer.buf)) {
        UnlockReleaseBuffer(buffer.buf);
    }
}

static void UBTree3XlogReuseTdSlot(XLogReaderState* record)
{
    RedoBufferInfo buffer;
    if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) {
        UBTree3XlogReuseTdOperatorPage(&buffer, (void *)XLogRecGetData(record));
        if (BufferIsValid(buffer.buf)) {
            MarkBufferDirty(buffer.buf);
        }
    }

    if (BufferIsValid(buffer.buf)) {
        UnlockReleaseBuffer(buffer.buf);
    }
}

static void UBTree3XlogExtendTdSlots(XLogReaderState* record)
{
    RedoBufferInfo buffer;
    if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) {
        UBTree3XlogExtendTdOperatorPage(&buffer, (void *)XLogRecGetData(record));
        if (BufferIsValid(buffer.buf)) {
            MarkBufferDirty(buffer.buf);
        }
    }

    if (BufferIsValid(buffer.buf)) {
        UnlockReleaseBuffer(buffer.buf);
    }
}

static void UBTree3XlogRollBackTxn(XLogReaderState* record)
{
    RedoBufferInfo buffer;
    if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) {
        UBTree3XlogRollbackTxnOperatorPage(&buffer, (void *)XLogRecGetData(record));
        if (BufferIsValid(buffer.buf)) {
            MarkBufferDirty(buffer.buf);
        }
    }

    if (BufferIsValid(buffer.buf)) {
        UnlockReleaseBuffer(buffer.buf);
    }
}

void UBTree3XlogSplitLeftPage(RedoBufferInfo *bufferinfo, void *recorddata, bool onleft, void *blkdata, Size datalen)
{
    xl_ubtree3_split *xlrec = (xl_ubtree3_split *)recorddata;
    Page lpage = bufferinfo->pageinfo.page;
    UBTPCRPageOpaque lopaque = (UBTPCRPageOpaque)PageGetSpecialPointer(lpage);
    char *dataPos = (char *)blkdata;
    IndexTuple item;
    Size size = 0;
    if (onleft) {
        item = (IndexTuple)dataPos;
        size = MAXALIGN(IndexTupleSize(item));
        dataPos += size;
        datalen -= size;
    }
    Item leftHighKey = (Item)dataPos;
    Size leftHighKeySz = MAXALIGN(IndexTupleSize(leftHighKey));
    dataPos += leftHighKeySz;
    datalen -= leftHighKeySz;
    Assert(datalen == 0);

    START_CRIT_SECTION();
    Page lNewPage = PageGetTempPageCopySpecial(lpage);
    if (P_ISLEAF(lopaque)) {
        UBTreePCRCopyTDSlot(lpage, lNewPage);
    }
    END_CRIT_SECTION();
    OffsetNumber leftOff = P_HIKEY;
    if (UBTPCRPageAddItem(lNewPage, leftHighKey, leftHighKeySz, P_HIKEY, false) == InvalidOffsetNumber) {
        ereport(PANIC, (errcode(ERRCODE_INDEX_CORRUPTED),
                errmsg("faild to add left P_HIKEY wihle redo.")));
    }
    leftOff = OffsetNumberNext(leftOff);
    OffsetNumber off;
    for (off = P_FIRSTDATAKEY(lopaque); off < xlrec->firstRight; off++) {
        if (onleft && off == xlrec->newItemOff) {
            if (!UBTreePCRPageAddTuple(lNewPage, size, NULL, item, leftOff, false, xlrec->slotNo)) {
                ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED), errmsg("failed to add new item to block")));
            }
            leftOff = OffsetNumberNext(leftOff);
        }

        UBTreeItemId iid = UBTreePCRGetRowPtr(lpage, off);
        IndexTuple itup = UBTreePCRGetIndexTuple(lpage, off);
        Size itupSize = IndexTupleSize(itup);
        if (!UBTreePCRPageAddTuple(lNewPage, itupSize, iid, itup, leftOff, true, UBTreeInvalidTDSlotId)) {
            ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED), errmsg("failed to add new item to block")));
        }
        leftOff = OffsetNumberNext(leftOff);
    }
    if (onleft && off == xlrec->newItemOff) {
        if (!UBTreePCRPageAddTuple(lNewPage, size,  NULL, item, leftOff, false, xlrec->slotNo)) {
            ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED), errmsg("failed to add new item to block")));
        }
        leftOff = OffsetNumberNext(leftOff);
    }
    if (onleft && xlrec->slotNo != UBTreeInvalidTDSlotId) {
        UBTreeTD td = UBTreePCRGetTD(lNewPage, xlrec->slotNo);
        td->setInfo(xlrec->fxid, xlrec->urp);
    }
    PageRestoreTempPage(lNewPage, lpage);
    UBTPCRPageOpaque opaque = (UBTPCRPageOpaque)PageGetSpecialPointer(lpage);
    *opaque = xlrec->letfPcrOpq;
    PageSetLSN(lpage, bufferinfo->lsn);
}

static void UBTree3XlogSplit(XLogReaderState* record, bool onLeft, bool isRoot)
{
    RelFileNode rnode;
    BlockNumber leftSib;
    BlockNumber rightSib;
    BlockNumber rnext;
    (void) XLogRecGetBlockTag(record, BTREE_SPLIT_LEFT_BLOCK_NUM, &rnode, NULL, &leftSib);
    (void) XLogRecGetBlockTag(record, BTREE_SPLIT_RIGHT_BLOCK_NUM, NULL, NULL, &rightSib);
    if (!XLogRecGetBlockTag(record, BTREE_SPLIT_RIGHTNEXT_BLOCK_NUM, NULL, NULL, &rnext)) {
        rnext = P_NONE;
    }
    bool replayAll = !AmPageRedoWorker() || !SUPPORT_USTORE_UNDO_WORKER;
    bool replayRedoOnly = replayAll ? false : parallel_recovery::DoPageRedoWorkerReplayUndo();

    char *rec = (char *)XLogRecGetData(record);
    xl_ubtree3_split *xlrec = (xl_ubtree3_split *)rec;
    rec += SizeOfUbtree3Split;
    if (xlrec->slotNo != UBTreeInvalidTDSlotId) {
        xl_ubtree3_insert_or_delete *xlrecInsert = (xl_ubtree3_insert_or_delete* )rec;
        rec += SizeOfUbtree3InsertOrDelete;
        IndexTuple itup = (IndexTuple)rec;
        rec += IndexTupleSize(itup);
        xlrec->urp = PrepareUndoRecordForRedo(record, xlrecInsert, itup, rec, (replayAll || replayRedoOnly), true);
    }

    if (replayRedoOnly) {
        return;
    }

    if (xlrec->level > 0) {
        RedoBufferInfo cbuf;
        if (XLogReadBufferForRedo(record, BTREE_SPLIT_CHILD_BLOCK_NUM, &cbuf) == BLK_NEEDS_REDO) {
            UBTreeXlogClearIncompleteSplit<UBTPCRPageOpaque>(&cbuf);
            if (BufferIsValid(cbuf.buf)) {
                MarkBufferDirty(cbuf.buf);
            }
        }
        if (BufferIsValid(cbuf.buf)) {
            UnlockReleaseBuffer(cbuf.buf);
        }
    }

    RedoBufferInfo rbuf;
    Size dataLen;
    XLogInitBufferForRedo(record, BTREE_SPLIT_RIGHT_BLOCK_NUM, &rbuf);
    char *dataPos = XLogRecGetBlockData(record, BTREE_SPLIT_RIGHT_BLOCK_NUM, &dataLen);
    Page rpage = rbuf.pageinfo.page;
    UBTreePCRPageInit(rpage, rbuf.pageinfo.pagesize);
    errno_t rc = memcpy_s(rpage, xlrec->rightLower, dataPos, xlrec->rightLower);
    securec_check(rc, "\0", "\0");
    dataPos += xlrec->rightLower;
    Size upper = dataLen - xlrec->rightLower;
    rc = memcpy_s(rpage + BLCKSZ - upper, upper, dataPos, upper);
    securec_check(rc, "\0", "\0");
    PageSetLSN(rpage, rbuf.lsn);
    if (BufferIsValid(rbuf.buf)) {
        MarkBufferDirty(rbuf.buf);
    }

    RedoBufferInfo lbuf;
    if (XLogReadBufferForRedo(record, BTREE_SPLIT_LEFT_BLOCK_NUM, &lbuf) == BLK_NEEDS_REDO) {
        dataPos = XLogRecGetBlockData(record, BTREE_SPLIT_LEFT_BLOCK_NUM, &dataLen);
        UBTree3XlogSplitLeftPage(&lbuf, (void *)xlrec, onLeft, (void *)dataPos, dataLen);
        if (BufferIsValid(lbuf.buf)) {
            MarkBufferDirty(lbuf.buf);
        }
    }
    if (BufferIsValid(lbuf.buf)) {
        UnlockReleaseBuffer(lbuf.buf);
    }
    if (BufferIsValid(rbuf.buf)) {
        UnlockReleaseBuffer(rbuf.buf);
    }

    RedoBufferInfo sbuf;
    if (rnext != P_NONE) {
        if (XLogReadBufferForRedo(record, BTREE_SPLIT_RIGHTNEXT_BLOCK_NUM, &sbuf) == BLK_NEEDS_REDO) {
            Page page = sbuf.pageinfo.page;
            UBTPCRPageOpaque opaque = (UBTPCRPageOpaque)PageGetSpecialPointer(page);
            opaque->btpo_prev = rightSib;
            PageSetLSN(page, sbuf.lsn);
            if (BufferIsValid(sbuf.buf)) {
                MarkBufferDirty(sbuf.buf);
            }
        }
        if (BufferIsValid(sbuf.buf)) {
            UnlockReleaseBuffer(sbuf.buf);
        }
    }
}

static void UBTree4XlogUnlinkPage(uint8 info, XLogReaderState *record)
{
    xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *)XLogRecGetData(record);
    BlockNumber leftsib;
    BlockNumber rightsib;

    leftsib = xlrec->leftsib;
    rightsib = xlrec->rightsib;

    /*
     * In normal operation, we would lock all the pages this WAL record
     * touches before changing any of them.  In WAL replay, it should be okay
     * to lock just one page at a time, since no concurrent index updates can
     * be happening, and readers should not care whether they arrive at the
     * target page or not (since it's surely empty).
     */

    /* Fix left-link of right sibling */
    RedoBufferInfo rbuffer;
    if (XLogReadBufferForRedo(record, BTREE_UNLINK_PAGE_RIGHT_NUM, &rbuffer) == BLK_NEEDS_REDO) {
        UBTree4XlogUnlinkPageOperatorRightpage(&rbuffer, xlrec);
        MarkBufferDirty(rbuffer.buf);
    }
    if (BufferIsValid(rbuffer.buf)) {
        UnlockReleaseBuffer(rbuffer.buf);
    }

    /* Fix right-link of left sibling, if any */
    if (leftsib != P_NONE) {
        RedoBufferInfo lbuffer;
        if (XLogReadBufferForRedo(record, BTREE_UNLINK_PAGE_LEFT_NUM, &lbuffer) == BLK_NEEDS_REDO) {
            UBTree4XlogUnlinkPageOperatorLeftpage(&lbuffer, xlrec);
            MarkBufferDirty(lbuffer.buf);
        }
        if (BufferIsValid(lbuffer.buf)) {
            UnlockReleaseBuffer(lbuffer.buf);
        }
    }

    /* Rewrite target page as empty deleted page */
    RedoBufferInfo buffer;
    XLogInitBufferForRedo(record, BTREE_UNLINK_PAGE_CUR_PAGE_NUM, &buffer);
    UBTree4XlogUnlinkPageOperatorCurpage(&buffer, xlrec);

    if (BufferIsValid(buffer.buf)) {
        MarkBufferDirty(buffer.buf);
        UnlockReleaseBuffer(buffer.buf);
    }

    /*
     * If we deleted a parent of the targeted leaf page, instead of the leaf
     * itself, update the leaf to point to the next remaining child in the
     * branch.
     */
    if (XLogRecHasBlockRef(record, BTREE_UNLINK_PAGE_CHILD_NUM)) {
        /*
         * There is no real data on the page, so we just re-create it from
         * scratch using the information from the WAL record.
         */
        RedoBufferInfo cbuffer;
        XLogInitBufferForRedo(record, BTREE_UNLINK_PAGE_CHILD_NUM, &cbuffer);
        UBTree4XlogUnlinkPageOperatorChildpage(&cbuffer, xlrec);
        if (BufferIsValid(cbuffer.buf)) {
            MarkBufferDirty(cbuffer.buf);
            UnlockReleaseBuffer(cbuffer.buf);
        }
    }

    /* Update metapage if needed */
    if (info == XLOG_UBTREE4_UNLINK_PAGE_META) {
        UBTree3RestoreMeta(record, BTREE_UNLINK_PAGE_META_NUM);
    }
}

static void UBTree4XlogMarkPageHalfDead(XLogReaderState *record)
{
    xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *)XLogRecGetData(record);
    RedoBufferInfo pbuffer;

    /*
     * In normal operation, we would lock all the pages this WAL record
     * touches before changing any of them.  In WAL replay, it should be okay
     * to lock just one page at a time, since no concurrent index updates can
     * be happening, and readers should not care whether they arrive at the
     * target page or not (since it's surely empty).
     */

    /* parent page */
    if (XLogReadBufferForRedo(record, BTREE_HALF_DEAD_PARENT_PAGE_NUM, &pbuffer) == BLK_NEEDS_REDO) {
        UBTree4XlogHalfdeadPageOperatorParentpage(&pbuffer, xlrec);
        MarkBufferDirty(pbuffer.buf);
    }
    if (BufferIsValid(pbuffer.buf)) {
        UnlockReleaseBuffer(pbuffer.buf);
    }

    RedoBufferInfo lbuffer;
    bool willInit = record->blocks[BTREE_HALF_DEAD_LEAF_PAGE_NUM].flags & BKPBLOCK_WILL_INIT;
    if (willInit) {
        XLogInitBufferForRedo(record, BTREE_HALF_DEAD_LEAF_PAGE_NUM, &lbuffer);
        UBTree4XlogHalfdeadPageOperatorLeafpage(&lbuffer, xlrec, willInit);
        if (BufferIsValid(lbuffer.buf)) {
            MarkBufferDirty(lbuffer.buf);
            UnlockReleaseBuffer(lbuffer.buf);
        }
        return;
    }

    if (XLogReadBufferForRedo(record, BTREE_HALF_DEAD_PARENT_PAGE_NUM, &lbuffer) == BLK_NEEDS_REDO) {
        UBTree4XlogHalfdeadPageOperatorLeafpage(&lbuffer, xlrec, willInit);
        if (BufferIsValid(lbuffer.buf)) {
            MarkBufferDirty(lbuffer.buf);
        }
    }
    if (BufferIsValid(lbuffer.buf)) {
        UnlockReleaseBuffer(lbuffer.buf);
    }
}

void UBTree3Redo(XLogReaderState* record)
{
    uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
    info &= XLOG_UBTREE_PCR_OP_MASK;

    switch (info) {
        case XLOG_UBTREE3_INSERT_PCR_INTERNAL:
            UBTree3XlogInsertPcrInternal(record, false);
            break;
        case XLOG_UBTREE3_INSERT_PCR_META:
            UBTree3XlogInsertPcrInternal(record, true);
            break;
        case XLOG_UBTREE3_DUP_INSERT:
            UBTree3XlogInsert(record, true);
            break;
        case XLOG_UBTREE3_INSERT_PCR:
            UBTree3XlogInsert(record, false);
            break;
        case XLOG_UBTREE3_NEW_ROOT:
            UBTree3XlogNewRoot(record);
            break;
        case XLOG_UBTREE3_DELETE_PCR:
            UBTree3XlogDelete(record);
            break;
        case XLOG_UBTREE3_PRUNE_PAGE_PCR:
            UBTree3XlogPrunePage(record);
            break;
        case XLOG_UBTREE3_FREEZE_TD_SLOT:
            UBTree3XlogFreezeTdSlot(record);
            break;
        case XLOG_UBTREE3_REUSE_TD_SLOT:
            UBTree3XlogReuseTdSlot(record);
            break;
        case XLOG_UBTREE3_EXTEND_TD_SLOTS:
            UBTree3XlogExtendTdSlots(record);
            break;
        case XLOG_UBTREE3_ROLLBACK_TXN:
            UBTree3XlogRollBackTxn(record);
            break;
        case XLOG_UBTREE3_SPLIT_L:
            UBTree3XlogSplit(record, true, false);
            break;
        case XLOG_UBTREE3_SPLIT_R:
            UBTree3XlogSplit(record, false, false);
            break;
        case XLOG_UBTREE3_SPLIT_L_ROOT:
            UBTree3XlogSplit(record, true, true);
            break;
        case XLOG_UBTREE3_SPLIT_R_ROOT:
            UBTree3XlogSplit(record, false, true);
            break;
        default:
            ereport(PANIC, (errmsg("UBTree3Redo: unknown op code %hhu", info)));
    }
}

void UBTree4Redo(XLogReaderState* record)
{
    uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
    info &= XLOG_UBTREE_PCR_OP_MASK;

    switch (info) {
        case XLOG_UBTREE4_UNLINK_PAGE:
            UBTree4XlogUnlinkPage(info, record);
            break;
        case XLOG_UBTREE4_UNLINK_PAGE_META:
            UBTree4XlogUnlinkPage(info, record);
            break;
        case XLOG_UBTREE4_MARK_PAGE_HALFDEAD:
            UBTree4XlogMarkPageHalfDead(record);
            break;
        default:
            ereport(PANIC, (errmsg("UBTree4Redo: unknown op code %hhu", info)));
    }
}

void UBTree2Redo(XLogReaderState* record)
{
    uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;

    switch (info) {
        case XLOG_UBTREE2_SHIFT_BASE:
            UBTree2XlogShiftBase(record);
            break;
        case XLOG_UBTREE2_RECYCLE_QUEUE_INIT_PAGE:
            UBTree2XlogRecycleQueueInitPage(record);
            break;
        case XLOG_UBTREE2_RECYCLE_QUEUE_ENDPOINT:
            UBTree2XlogRecycleQueueEndpoint(record);
            break;
        case XLOG_UBTREE2_RECYCLE_QUEUE_MODIFY:
            UBTree2XlogRecycleQueueModify(record);
            break;
        case XLOG_UBTREE2_FREEZE:
            UBTree2XlogFreeze(record);
            break;
        default:
            ereport(PANIC, (errmsg("UBTree2Redo: unknown op code %hhu", info)));
    }
}

void UBTreeXlogStartup(void)
{
    t_thrd.xlog_cxt.incomplete_actions = NIL;
}

void UBTreeXlogFinishIncompleteSplit(UBTreeIncompleteAction *action)
{
    Buffer lbuf, rbuf;
    Page lpage, rpage;
    UBTPageOpaqueInternal lpageop, rpageop;
    bool is_only = false;
    Relation reln;

    lbuf = XLogReadBufferExtended(action->node, MAIN_FORKNUM, action->leftblk, RBM_NORMAL, NULL);
    /* failure is impossible because we wrote this page earlier */
    if (BufferIsValid(lbuf))
        LockBuffer(lbuf, BUFFER_LOCK_EXCLUSIVE);
    else
        ereport(PANIC, (errmsg("UBTreeXlogCleanup: left block unfound")));
    lpage = (Page)BufferGetPage(lbuf);
    lpageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(lpage);
    rbuf = XLogReadBufferExtended(action->node, MAIN_FORKNUM, action->rightblk, RBM_NORMAL, NULL);
    /* failure is impossible because we wrote this page earlier */
    if (BufferIsValid(rbuf))
        LockBuffer(rbuf, BUFFER_LOCK_EXCLUSIVE);
    else
        ereport(PANIC, (errmsg("UBTreeXlogCleanup: right block unfound")));
    rpage = (Page)BufferGetPage(rbuf);
    rpageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(rpage);

    /* if the pages are all of their level, it's a only-page split */
    is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop);

    reln = CreateFakeRelcacheEntry(action->node);
    UBTreeInsertParent(reln, lbuf, rbuf, NULL, action->is_root, is_only);
    FreeFakeRelcacheEntry(reln);
}

/* finish an incomplete deletion (of a half-dead page) */
void UBTreeXlogFinishIncompleteDeletion(const UBTreeIncompleteAction *action)
{
    Buffer buf;
    buf = XLogReadBufferExtended(action->node, MAIN_FORKNUM, action->delblk, RBM_NORMAL, NULL);
    if (BufferIsValid(buf)) {
        Relation reln;

        LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
        reln = CreateFakeRelcacheEntry(action->node);
        if (UBTreePageDel(reln, buf) == 0) {
            ereport(PANIC, (errmsg("UBTreeXlogCleanup: UBTreePageDel failed")));
        }
        FreeFakeRelcacheEntry(reln);
    }
}

void *UBTreeGetIncompleteActions()
{
    List *incompleteActions = t_thrd.xlog_cxt.incomplete_actions;
    t_thrd.xlog_cxt.incomplete_actions = NIL;
    return incompleteActions;
}

void UBTreeXlogCleanup(void)
{
    ListCell *l = NULL;
    TimestampTz start_time = GetCurrentTimestamp();
    int64 duration = 0;
    MemoryContext oldCtx = NULL;
    if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
        oldCtx = MemoryContextSwitchTo(g_instance.comm_cxt.predo_cxt.parallelRedoCtx);
    }

    if (log_min_messages <= DEBUG4) {
        ereport(LOG, (errmsg("[BTREE_ACTION_TRACE]UBTreeXlogCleanup")));
    }

    /* we need to make sure we update root split and corresponding metapage first */
    foreach (l, t_thrd.xlog_cxt.incomplete_actions) {
        UBTreeIncompleteAction *action = (UBTreeIncompleteAction *)lfirst(l);
        t_thrd.xlog_cxt.imcompleteActionCnt++;
        ereport(WARNING, (errmsg("[BTREE_ACTION_TRACE]UBTreeXlogCleanup: action spc:%u,db:%u,rel:%u,"
                                 "is_split:%d,is_root:%d,leftblk:%u,rightblk:%u,level:%u,delblk:%u,happen:%u,enable:%u",
                                 action->node.spcNode, action->node.dbNode, action->node.relNode, action->is_split,
                                 action->is_root, action->leftblk, action->rightblk, action->level, action->delblk,
                                 t_thrd.xlog_cxt.forceFinishHappened,
                                 g_instance.attr.attr_storage.enable_update_max_page_flush_lsn)));
        if (FORCE_FINISH_ENABLED) {
            continue;
        }
        if (action->is_split && action->is_root) {
            if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
                MemoryContext ctx = MemoryContextSwitchTo(oldCtx);
                UBTreeXlogFinishIncompleteSplit(action);
                (void)MemoryContextSwitchTo(ctx);
            } else {
                UBTreeXlogFinishIncompleteSplit(action);
            }
        }
    }

    foreach (l, t_thrd.xlog_cxt.incomplete_actions) {
        UBTreeIncompleteAction *action = (UBTreeIncompleteAction *)lfirst(l);
        ereport(WARNING, (errmsg("[BTREE_ACTION_TRACE]UBTreeXlogCleanup2: action spc:%u,db:%u,rel:%u,"
                                 "is_split:%u,is_root:%u,leftblk:%u,rightblk:%u,level:%u,delblk:%u,happen:%u,enable:%u",
                                 action->node.spcNode, action->node.dbNode, action->node.relNode, action->is_split,
                                 action->is_root, action->leftblk, action->rightblk, action->level, action->delblk,
                                 t_thrd.xlog_cxt.forceFinishHappened,
                                 g_instance.attr.attr_storage.enable_update_max_page_flush_lsn)));
        if (FORCE_FINISH_ENABLED) {
            continue;
        }

        if (action->is_split) {
            if (!action->is_root) {
                if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
                    MemoryContext ctx = MemoryContextSwitchTo(oldCtx);
                    UBTreeXlogFinishIncompleteSplit(action);
                    (void)MemoryContextSwitchTo(ctx);
                } else {
                    UBTreeXlogFinishIncompleteSplit(action);
                }
            }
        } else {
            if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
                MemoryContext ctx = MemoryContextSwitchTo(oldCtx);
                UBTreeXlogFinishIncompleteDeletion(action);
                (void)MemoryContextSwitchTo(ctx);
            } else {
                UBTreeXlogFinishIncompleteDeletion(action);
            }
        }
    }
    t_thrd.xlog_cxt.incomplete_actions = NIL;

    if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
        (void)MemoryContextSwitchTo(oldCtx);
    }
    duration = GetCurrentTimestamp() - start_time;
    ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG),
            errmsg("UBTreeXlogCleanup is over, it takes time:%ld microseconds", duration)));
}

bool UBTreeSafeRestartPoint(void)
{
    if (t_thrd.xlog_cxt.incomplete_actions)
        return false;
    return true;
}

void UBTreeClearIncompleteAction()
{
    if ((get_real_recovery_parallelism() > 1) && (!parallel_recovery::DispatchPtrIsNull())) {
        SwitchToDispatcherContext();
        t_thrd.xlog_cxt.incomplete_actions =
                parallel_recovery::CheckImcompleteAction(t_thrd.xlog_cxt.incomplete_actions);
        EndDispatcherContext();
    }
    UBTreeXlogCleanup();
}