/*
 * Copyright (c) 2020 Huawei Technologies Co.,Ltd.
 *
 * openGauss is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * ---------------------------------------------------------------------------------------
 *
 * ss_dms_bufmgr.cpp
 *        Provide common interface for read page within DMS process.
 *
 * IDENTIFICATION
 *        src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp
 *
 * ---------------------------------------------------------------------------------------
 */

#include "postgres.h"
#include "storage/proc.h"
#include "storage/buf/bufmgr.h"
#include "storage/smgr/segment.h"
#include "replication/ss_disaster_cluster.h"
#include "utils/resowner.h"
#include "ddes/dms/ss_dms_bufmgr.h"
#include "ddes/dms/ss_reform_common.h"
#include "ddes/dms/ss_dms_recovery.h"
#include "securec_check.h"
#include "miscadmin.h"
#include "access/double_write.h"
#include "access/multi_redo_api.h"

void InitDmsBufCtrl(void)
{
    bool found_dms_buf = false;
    t_thrd.storage_cxt.dmsBufCtl = (dms_buf_ctrl_t *)CACHELINEALIGN(ShmemInitStruct(
        "dms buffer ctrl", TOTAL_BUFFER_NUM * sizeof(dms_buf_ctrl_t) + PG_CACHE_LINE_SIZE, &found_dms_buf));

    if (!found_dms_buf) {
        for (int i = 0; i < TOTAL_BUFFER_NUM; i++) {
            dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(i);
            buf_ctrl->buf_id = i;
            buf_ctrl->state = 0;
            buf_ctrl->lock_mode = (uint8)DMS_LOCK_NULL;
            buf_ctrl->is_edp = 0;
            buf_ctrl->force_request = 0;
            buf_ctrl->edp_scn = 0;
            buf_ctrl->edp_map = 0;
            buf_ctrl->pblk_relno = InvalidOid;
            buf_ctrl->pblk_blkno = InvalidBlockNumber;
            buf_ctrl->pblk_lsn = InvalidXLogRecPtr;
            buf_ctrl->been_loaded = false;
            buf_ctrl->ctrl_lock = LWLockAssign(LWTRANCHE_DMS_BUF_CTRL, i);
            buf_ctrl->need_check_pincount = false;
            buf_ctrl->pinned_count = 0;
            buf_ctrl->lsn_on_disk = 0;
        }
    }
}

void InitDmsContext(dms_context_t *dmsContext)
{
    (void)memset_s(dmsContext, sizeof(dms_context_t), 0, sizeof(dms_context_t));
    /* Proc threads id range: [0, TotalProcs - 1]. Non-proc threads id range: [TotalProcs + 1, TotalProcs + 4] */
    uint32 TotalProcs = (uint32)(GLOBAL_ALL_PROCS);
    dmsContext->inst_id = (unsigned int)SS_MY_INST_ID;
    dmsContext->sess_id = (unsigned int)(t_thrd.proc ? t_thrd.proc->logictid : t_thrd.myLogicTid + TotalProcs);
    dmsContext->db_handle = t_thrd.proc;
    dmsContext->sess_type = DMSGetProcType4RequestPage();
    dmsContext->is_try = 0;
}

void InitDmsBufContext(dms_context_t* dmsBufCxt, BufferTag buftag)
{
    int32 ret = memset_sp(dmsBufCxt, sizeof(dms_context_t), 0, sizeof(dms_context_t));
    securec_check(ret, "", "");
    
    InitDmsContext(dmsBufCxt);
    dmsBufCxt->len   = DMS_PAGEID_SIZE;
    dmsBufCxt->type = (unsigned char)DRC_RES_PAGE_TYPE;
    errno_t err = memcpy_s(dmsBufCxt->resid, DMS_PAGEID_SIZE, &buftag, sizeof(BufferTag));
    securec_check_c(err, "\0", "\0");
}

void TransformLockTagToDmsLatch(dms_drlatch_t* dlatch, const LOCKTAG locktag)
{
    int32 ret = memset_sp(&dlatch->drid, sizeof(dms_drid_t), 0, sizeof(dms_drid_t));
    securec_check(ret, "", "");

    dlatch->drid.type = locktag.locktag_type;
    dlatch->drid.oid = locktag.locktag_field1;
    dlatch->drid.index = locktag.locktag_field2;
    dlatch->drid.parent = locktag.locktag_field3;
    dlatch->drid.part = locktag.locktag_field4;
    dlatch->drid.uid = locktag.locktag_field5;
    dlatch->handle = NULL;
}

static void CalcSegDmsPhysicalLoc(BufferDesc* buf_desc, Buffer buffer, bool check_standby)
{
    if (IsSegmentFileNode(buf_desc->tag.rnode)) {
        SegmentCheck(!IsSegmentPhysicalRelNode(buf_desc->tag.rnode));
        ereport(WARNING, (errmsg("[SS] buffer:%d is segdata page, bufdesc seginfo is empty", buffer)));
        SegPageLocation loc = seg_get_physical_location(buf_desc->tag.rnode, buf_desc->tag.forkNum,
            buf_desc->tag.blockNum, check_standby);
        SegmentCheck(loc.blocknum != InvalidBlockNumber);
        buf_desc->extra->seg_fileno = (uint8)EXTENT_SIZE_TO_TYPE((int)loc.extent_size);
        buf_desc->extra->seg_blockno = loc.blocknum;
    }
}

bool LockModeCompatible(dms_buf_ctrl_t *buf_ctrl, LWLockMode mode)
{
    bool compatible = false;

    if (mode == LW_SHARED) {
        switch (buf_ctrl->lock_mode) {
            case DMS_LOCK_SHARE:
            case DMS_LOCK_EXCLUSIVE:
                compatible = true;
                break;
            default:
                break;
        }
    } else if (mode == LW_EXCLUSIVE) {
        if (buf_ctrl->lock_mode == (uint8)DMS_LOCK_EXCLUSIVE) {
            compatible = true;
        }
    } else {
        AssertEreport(0, MOD_DMS, "[SS] lock mode value is wrong");
    }

    return compatible;
}

void MarkReadPblk(int buf_id, const XLogPhyBlock *pblk)
{
    dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_id);
    if (pblk) {
        buf_ctrl->pblk_relno = pblk->relNode;
        buf_ctrl->pblk_blkno = pblk->block;
        buf_ctrl->pblk_lsn = pblk->lsn;
    } else {
        buf_ctrl->pblk_relno = InvalidOid;
        buf_ctrl->pblk_blkno = InvalidBlockNumber;
        buf_ctrl->pblk_lsn = InvalidXLogRecPtr;
    }
}

void MarkReadHint(int buf_id, char persistence, bool extend, const XLogPhyBlock *pblk)
{
    dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_id);
    if (persistence == 'p') {
        buf_ctrl->state |= BUF_IS_RELPERSISTENT;
    } else if (persistence == 't') {
        buf_ctrl->state |= BUF_IS_RELPERSISTENT_TEMP;
    }

    if (extend) {
        buf_ctrl->state |= BUF_IS_EXTEND;
    }

    MarkReadPblk(buf_id, pblk);
}

void ClearReadHint(int buf_id, bool buf_deleted)
{
    dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_id);
    buf_ctrl->state &=
        ~(BUF_NEED_LOAD | BUF_IS_LOADED | BUF_LOAD_FAILED | BUF_NEED_TRANSFER | BUF_IS_EXTEND | BUF_DIRTY_NEED_FLUSH);
    if (buf_deleted) {
        buf_ctrl->state = 0;
    }
    buf_ctrl->seg_fileno = EXTENT_INVALID;
    buf_ctrl->seg_blockno = InvalidBlockNumber;
}

/*
 * true: the page is transferred successfully by dms,
 * false: the page request is rejected or error, if hold the content_lock,
 * should release the content_lock and io_in_process lock and retry.
 */
bool StartReadPage(BufferDesc *buf_desc, LWLockMode mode)
{
    dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
    dms_lock_mode_t req_mode = (mode == LW_SHARED) ? DMS_LOCK_SHARE : DMS_LOCK_EXCLUSIVE;

    dms_context_t dms_ctx;
    InitDmsBufContext(&dms_ctx, buf_desc->tag);
    LWLockAcquire((LWLock*)buf_ctrl->ctrl_lock, LW_EXCLUSIVE);
    int ret = dms_request_page(&dms_ctx, buf_ctrl, req_mode);
    LWLockRelease((LWLock*)buf_ctrl->ctrl_lock);
    return (ret == DMS_SUCCESS);
}

void SegPageCheckDiskLSNForRelease(BufferDesc *buf_desc)
{   
    if (SS_DISASTER_STANDBY_CLUSTER) {
        return;
    }
    dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
    RelFileNode rnode = buf_desc->tag.rnode;
    XLogRecPtr lsn_on_mem = PageGetLSN(BufHdrGetBlock(buf_desc));
    /* latest page must satisfy condition: page lsn_on_disk bigger than transfered page which is latest page */
    if ((lsn_on_mem != InvalidXLogRecPtr) && XLByteLT(lsn_on_mem, buf_ctrl->lsn_on_disk)) {
        ereport(PANIC, (errmsg("[SS check][%d/%d/%d/%d/%d %d-%d] memory lsn(0x%llx) is less than lsn_on_disk(0x%llx)",
                buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
                (int)buf_desc->tag.rnode.bucketNode, (int)buf_desc->tag.rnode.opt, buf_desc->tag.forkNum,
                buf_desc->tag.blockNum, (unsigned long long)lsn_on_mem, (unsigned long long)buf_ctrl->lsn_on_disk)));
    }    
}

#ifdef USE_ASSERT_CHECKING
SMGR_READ_STATUS SmgrNetPageCheckRead(Oid spcNode, Oid dbNode, Oid relNode, ForkNumber forkNum,
    BlockNumber blockNo, char *blockbuf)
{
    SMGR_READ_STATUS rdStatus;
    SegSpace* spc = spc_open(spcNode, dbNode, false);
    int count = 3;
    SegmentCheck(spc);
    RelFileNode fakenode = {
        .spcNode = spc->spcNode,
        .dbNode = spc->dbNode,
        .relNode = relNode,
        .bucketNode = SegmentBktId,
        .opt = 0
    };

RETRY:
    seg_physical_read(spc, fakenode, forkNum, blockNo, (char *)blockbuf);
    if (PageIsVerified((Page)blockbuf, blockNo)) {
        rdStatus = SMGR_RD_OK;
    } else {
        rdStatus = SMGR_RD_CRC_ERROR;
    }

    if (rdStatus == SMGR_RD_CRC_ERROR && count > 0) {
        count--;
        goto RETRY;
    }

    return rdStatus;
}

void SmgrNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk)
{
    /*
     * prerequisite is that the page that initialized to zero in memory should be flush to disk
     */
    if (ENABLE_VERIFY_PAGE_VERSION && (buf_desc->extra->seg_fileno != EXTENT_INVALID ||
        IsSegmentBufferID(buf_desc->buf_id)) && (read_mode == RBM_NORMAL)) {
        char *origin_buf = (char *)palloc(BLCKSZ + ALIGNOF_BUFFER);
        char *temp_buf = (char *)BUFFERALIGN(origin_buf);
        SMgrRelation smgr = smgropen(buf_desc->tag.rnode, InvalidBackendId);
        SMGR_READ_STATUS rdStatus;
        if (pblk != NULL) {
            rdStatus = SmgrNetPageCheckRead(smgr->smgr_rnode.node.spcNode, smgr->smgr_rnode.node.dbNode, pblk->relNode,
                                            buf_desc->tag.forkNum, pblk->block, (char *)temp_buf);
        } else if (buf_desc->extra->seg_fileno != EXTENT_INVALID) {
            rdStatus = SmgrNetPageCheckRead(smgr->smgr_rnode.node.spcNode, smgr->smgr_rnode.node.dbNode,
                                            buf_desc->extra->seg_fileno, buf_desc->tag.forkNum,
                                            buf_desc->extra->seg_blockno, (char *)temp_buf);
        } else {
            rdStatus = smgrread(smgr, buf_desc->tag.forkNum, buf_desc->tag.blockNum, (char *)temp_buf);
        }

        if (rdStatus == SMGR_RD_CRC_ERROR) {
            ereport(WARNING, (errmsg("[SS][%d/%d/%d/%d/%d %d-%d] read from disk error, maybe buffer in flush",
                buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
                (int)buf_desc->tag.rnode.bucketNode, (int)buf_desc->tag.rnode.opt, buf_desc->tag.forkNum,
                buf_desc->tag.blockNum)));
            return;
        }
        XLogRecPtr lsn_on_disk = PageGetLSN(temp_buf);
        XLogRecPtr lsn_on_mem = PageGetLSN(BufHdrGetBlock(buf_desc));
        /* maybe some pages are not protected by WAL-Logged */
        if ((lsn_on_mem != InvalidXLogRecPtr) && (lsn_on_disk > lsn_on_mem)) {
            RelFileNode rnode = buf_desc->tag.rnode;
            int elevel = WARNING;
            if (!RecoveryInProgress() && !SS_IN_ONDEMAND_RECOVERY) {
                elevel = PANIC;
            }
            ereport(elevel, (errmsg("[SS][%d/%d/%d/%d/%d %d-%d] memory lsn(0x%llx) is less than disk lsn(0x%llx)",
                rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
                buf_desc->tag.forkNum, buf_desc->tag.blockNum,
                (unsigned long long)lsn_on_mem, (unsigned long long)lsn_on_disk)));
        }
        pfree(origin_buf);
    }
}
#endif

Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk)
{
    dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
    Buffer buffer;
    if (buf_ctrl->state & BUF_NEED_LOAD) {
        if (g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy && AmDmsReformProcProcess()) {
            ereport(PANIC, (errmsg("[SS] In flush copy, can't read from disk!")));
        }
        buffer = ReadBuffer_common_for_dms(read_mode, buf_desc, pblk);
        if (t_thrd.dms_cxt.page_need_retry) {
            return InvalidBuffer;
        }
    } else {
#ifdef USE_ASSERT_CHECKING
        if (buf_ctrl->state & BUF_IS_EXTEND) {
            ereport(PANIC, (errmsg("[SS] extend page should not be tranferred from DMS, "
                "and needs to be loaded from disk!")));
        }
        if (buf_ctrl->been_loaded == false) {
            ereport(PANIC, (errmsg("[SS] ctrl not marked loaded before transferring from remote")));
        }
#endif

        Block bufBlock = BufHdrGetBlock(buf_desc);
        Page page = (Page)(bufBlock);
        PageSetChecksumInplace(page, buf_desc->tag.blockNum);

#ifdef USE_ASSERT_CHECKING
        SmgrNetPageCheckDiskLSN(buf_desc, read_mode, pblk);
#else
        SegPageCheckDiskLSNForRelease(buf_desc);
#endif
        buffer = BufferDescriptorGetBuffer(buf_desc);
        if ((!RecoveryInProgress() || g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy) &&
            buf_desc->extra->seg_fileno == EXTENT_INVALID) {
            CalcSegDmsPhysicalLoc(buf_desc, buffer, !g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy);
        }
        buf_desc->extra->lsn_on_disk = InvalidXLogRecPtr;
    }
    if (BufferIsValid(buffer)) {
        buf_ctrl->been_loaded = true;
    }

    if ((read_mode == RBM_ZERO_AND_LOCK || read_mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
        !LWLockHeldByMe(buf_desc->content_lock)) {
        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
    }

    ClearReadHint(buf_desc->buf_id);

    if (SS_STANDBY_MODE && (buf_desc->state & BM_VALID) && buf_ctrl->need_check_pincount && SS_AM_WORKER) {
        if (!(IsSegmentBufferID(buf_desc->buf_id))) {
            ForgetBufferNeedCheckPin(buf_desc->buf_id + 1);
        }
    }

    TerminateBufferIO(buf_desc, false, BM_VALID);

    /*
     * we need redo items to get lastest page in ondemand recovery
     */
    if (t_thrd.role != PAGEREDO && SS_PRIMARY_ONDEMAND_RECOVERY && !LWLockHeldByMe(buf_desc->content_lock)) {
        buf_desc = RedoForOndemandExtremeRTOQuery(buf_desc, RELPERSISTENCE_PERMANENT, buf_desc->tag.forkNum,
            buf_desc->tag.blockNum, read_mode);
    }
    return buffer;
}

static bool DmsStartBufferIO(BufferDesc *buf_desc, LWLockMode mode)
{
    uint64 buf_state;
    dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);

    if (IsSegmentBufferID(buf_desc->buf_id)) {
        Assert(!HasInProgressBuf());
    } else {
        Assert(!t_thrd.storage_cxt.InProgressBuf || t_thrd.storage_cxt.InProgressBuf == buf_desc);
    }

    if (LWLockHeldByMe(buf_desc->io_in_progress_lock)) {
        return false;
    }

    if (LockModeCompatible(buf_ctrl, mode)) {
        if (!(pg_atomic_read_u64(&buf_desc->state) & BM_IO_IN_PROGRESS)) {
            return false;
        }
    }

    for (;;) {
        (void)LWLockAcquire(buf_desc->io_in_progress_lock, LW_EXCLUSIVE);

        buf_state = LockBufHdr(buf_desc);
        if (!(buf_state & BM_IO_IN_PROGRESS)) {
            break;
        }

        UnlockBufHdr(buf_desc, buf_state);
        LWLockRelease(buf_desc->io_in_progress_lock);
        WaitIO(buf_desc);
    }

    if (LockModeCompatible(buf_ctrl, mode)) {
        UnlockBufHdr(buf_desc, buf_state);
        LWLockRelease(buf_desc->io_in_progress_lock);
        return false;
    }

    buf_state |= BM_IO_IN_PROGRESS;
    UnlockBufHdr(buf_desc, buf_state);

    if (IsSegmentBufferID(buf_desc->buf_id)) {
        SetInProgressFlags(buf_desc, true);
    } else {
        t_thrd.storage_cxt.InProgressBuf = buf_desc;
        t_thrd.storage_cxt.IsForInput = true;
    }
    return true;
}

#ifdef USE_ASSERT_CHECKING
void SegNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, SegSpace *spc)
{
    /*
     * prequisite is that the page that initialized to zero in memory should be flushed to disk,
     * references to seg_extend
     */
    if (ENABLE_VERIFY_PAGE_VERSION && (read_mode == RBM_NORMAL)) {
        char *origin_buf = (char *)palloc(BLCKSZ + ALIGNOF_BUFFER);
        char *temp_buf = (char *)BUFFERALIGN(origin_buf);
        ReadSegBufferForCheck(buf_desc, read_mode, spc, temp_buf);
        XLogRecPtr lsn_on_disk = PageGetLSN(temp_buf);
        XLogRecPtr lsn_on_mem = PageGetLSN(BufHdrGetBlock(buf_desc));
        /* maybe some pages are not protected by WAL-Logged */
        if ((lsn_on_mem != InvalidXLogRecPtr) && (lsn_on_disk > lsn_on_mem)) {
            RelFileNode rnode = buf_desc->tag.rnode;
            ereport(PANIC, (errmsg("[SS][%d/%d/%d/%d/%d %d-%d] memory lsn(0x%llx) is less than disk lsn(0x%llx)",
                rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
                buf_desc->tag.forkNum, buf_desc->tag.blockNum,
                (unsigned long long)lsn_on_mem, (unsigned long long)lsn_on_disk)));
        }
        pfree(origin_buf);
    }
}
#endif

Buffer TerminateReadSegPage(BufferDesc *buf_desc, ReadBufferMode read_mode, SegSpace *spc)
{
    dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
    Buffer buffer;
    if (buf_ctrl->state & BUF_NEED_LOAD) {
        buffer = ReadSegBufferForDMS(buf_desc, read_mode, spc);
    } else {
        Page page = (Page)BufHdrGetBlock(buf_desc);
        PageSetChecksumInplace(page, buf_desc->tag.blockNum);

#ifdef USE_ASSERT_CHECKING
        SegNetPageCheckDiskLSN(buf_desc, read_mode, spc);
#else
        SegPageCheckDiskLSNForRelease(buf_desc);
#endif

        SegTerminateBufferIO(buf_desc, false, BM_VALID);
        buffer = BufferDescriptorGetBuffer(buf_desc);
    }
    if (!BufferIsInvalid(buffer)) {
        buf_ctrl->been_loaded = true;
    }

    if ((read_mode == RBM_ZERO_AND_LOCK || read_mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
        !LWLockHeldByMe(buf_desc->content_lock)) {
        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
    }

    ClearReadHint(buf_desc->buf_id);
    return buffer;
}

Buffer DmsReadSegPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool* with_io)
{
    BufferDesc *buf_desc = GetBufferDescriptor(buffer - 1);
    dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);

    if (buf_ctrl->state & BUF_IS_RELPERSISTENT_TEMP) {
        return buffer;
    }
    if (!DmsCheckBufAccessible()) {
        *with_io = false;
        return 0;
    }
    if (!DmsStartBufferIO(buf_desc, mode)) {
        if (!DmsCheckBufAccessible()) {
            *with_io = false;
            return 0;
        }
        return buffer;
    }

    if (!StartReadPage(buf_desc, mode)) {
        return 0;
    }

    *with_io = false;
    return TerminateReadSegPage(buf_desc, read_mode);
}

Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool *with_io)
{
    BufferDesc *buf_desc = GetBufferDescriptor(buffer - 1);
    dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);

    if (buf_ctrl->state & BUF_IS_RELPERSISTENT_TEMP) {
        return buffer;
    }

    XLogPhyBlock pblk = {0, 0, 0};
    if (OidIsValid(buf_ctrl->pblk_relno)) {
        Assert(ExtentTypeIsValid(buf_ctrl->pblk_relno));
        Assert(buf_ctrl->pblk_blkno != InvalidBlockNumber);
        pblk.relNode = buf_ctrl->pblk_relno;
        pblk.block = buf_ctrl->pblk_blkno;
        pblk.lsn = buf_ctrl->pblk_lsn;
    }

    if (!DmsCheckBufAccessible()) {
        *with_io = false;
        return 0;
    }

    if (!DmsStartBufferIO(buf_desc, mode)) {
        if (!DmsCheckBufAccessible()) {
            *with_io = false;
            return 0;
        }
        return buffer;
    }

    // standby node must notify primary node for prepare lastest page in ondemand recovery
    if (SS_STANDBY_ONDEMAND_NOT_NORMAL && !SSOndemandRequestPrimaryRedo(buf_desc->tag)) {
        return 0;
    }

    if (!StartReadPage(buf_desc, mode)) {
        return 0;
    }

    *with_io = false;
    return TerminateReadPage(buf_desc, read_mode, OidIsValid(buf_ctrl->pblk_relno) ? &pblk : NULL);
}

bool SSOndemandRequestPrimaryRedo(BufferTag tag)
{
    dms_context_t dms_ctx;
    int32 redo_status = ONDEMAND_REDO_TIMEOUT;

    if (unlikely(SS_STANDBY_ONDEMAND_BUILD)) {
        return false;
    } else if (SS_STANDBY_ONDEMAND_NORMAL || SS_PRIMARY_MODE) {
        return true;
    // retry after reform finish
    } else if (SS_IN_REFORM) {
        return false;
    }

    ereport(DEBUG1,
        (errmsg("[SS][On-demand][%u/%u/%u/%d %d-%u] Start request primary node redo page.",
            tag.rnode.spcNode, tag.rnode.dbNode,
            tag.rnode.relNode, tag.rnode.bucketNode, tag.forkNum, tag.blockNum)));
    InitDmsContext(&dms_ctx);
    dms_ctx.xmap_ctx.dest_id = (unsigned int)SS_PRIMARY_ID;
    if (dms_reform_req_opengauss_ondemand_redo_buffer(&dms_ctx, &tag,
        (unsigned int)sizeof(BufferTag), &redo_status) != DMS_SUCCESS) {
        SSReadReformerCtrl();
        ereport(LOG,
            (errmsg("[SS][On-demand][%u/%u/%u/%d %d-%u] Request primary node redo page timeout.",
                tag.rnode.spcNode, tag.rnode.dbNode,
                tag.rnode.relNode, tag.rnode.bucketNode, tag.forkNum, tag.blockNum)));
        return false;
    }

    if (redo_status == ONDEMAND_REDO_DONE) {
        ereport(DEBUG1,
            (errmsg("[SS][On-demand][%u/%u/%u/%d %d-%u] Request primary node redo page done.",
                tag.rnode.spcNode, tag.rnode.dbNode,
                tag.rnode.relNode, tag.rnode.bucketNode, tag.forkNum, tag.blockNum)));
    } else if (redo_status == ONDEMAND_REDO_SKIP) {
        ereport(DEBUG1,
            (errmsg("[SS][On-demand] Primary node is not in ondemand recovery now and "
                "ignore this redo request, so refresh reform control file")));
        SSReadReformerCtrl();
    } else if (redo_status == ONDEMAND_REDO_ERROR) {
        ereport(PANIC,
            (errmsg("[SS][On-demand][%u/%u/%u/%d %d-%u] Error happend in request primary node redo page, "
                "read buffer crash.", tag.rnode.spcNode, tag.rnode.dbNode,
                tag.rnode.relNode, tag.rnode.bucketNode, tag.forkNum, tag.blockNum)));
        return false;
    } else if (redo_status == ONDEMAND_REDO_FAIL) {
        ereport(WARNING,
            (errmsg("[SS][On-demand][%u/%u/%u/%d %d-%u] Error happend in request primary node redo page, "
                "buffer is invalid", tag.rnode.spcNode, tag.rnode.dbNode,
                tag.rnode.relNode, tag.rnode.bucketNode, tag.forkNum, tag.blockNum)));
        return false;
    }
    return true;
}

bool DmsReleaseOwner(BufferTag buf_tag, int buf_id)
{
    dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_id);
    if (buf_ctrl->state & BUF_IS_RELPERSISTENT_TEMP) {
        return true;
    }
    unsigned char released = 0;
    dms_context_t dms_ctx;
    InitDmsBufContext(&dms_ctx, buf_tag);
    LWLockAcquire((LWLock*)buf_ctrl->ctrl_lock, LW_EXCLUSIVE);
    int ret = dms_release_owner(&dms_ctx, buf_ctrl, &released);
    LWLockRelease((LWLock*)buf_ctrl->ctrl_lock);
    return ((ret == DMS_SUCCESS) && (released != 0));
}

void BufValidateDrc(BufferDesc *buf_desc)
{
    dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
    Assert(buf_ctrl != NULL);
    Assert(buf_ctrl->is_edp != 1);
    Assert(XLogRecPtrIsValid(g_instance.dms_cxt.ckptRedo));

    dms_context_t dms_ctx;
    InitDmsBufContext(&dms_ctx, buf_desc->tag);
    unsigned long long lsn = (unsigned long long)BufferGetLSN(buf_desc);
    bool is_dirty = SSBufferIsDirty(buf_desc);
    dms_validate_drc(&dms_ctx, buf_ctrl, lsn, (unsigned char)is_dirty);
}

int SSLockAcquire(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait,
    dms_opengauss_lock_req_type_t reqType)
{
    dms_context_t dms_ctx;
    InitDmsContext(&dms_ctx);
    SSBroadcastDDLLock ssmsg;
    ssmsg.type = BCAST_DDLLOCK;
    errno_t rc = memcpy_s(&(ssmsg.locktag), sizeof(LOCKTAG), locktag, sizeof(LOCKTAG));
    securec_check(rc, "\0", "\0");
    ssmsg.lockmode = lockmode;
    ssmsg.sessionlock = sessionLock;
    ssmsg.dontWait = dontWait;
    unsigned int count = SS_BROADCAST_FAILED_RETRYCOUNTS;
    int ret = DMS_ERROR;

    int output_backup = t_thrd.postgres_cxt.whereToSendOutput;
    t_thrd.postgres_cxt.whereToSendOutput = DestNone;
    /* retry 3 times to get the lock (22seconds) */
    while (ret != DMS_SUCCESS && !dontWait && count) {
        ret = dms_broadcast_opengauss_ddllock(&dms_ctx, (char *)&ssmsg, sizeof(SSBroadcastDDLLock),
            (unsigned char)false, dontWait ? SS_BROADCAST_WAIT_FIVE_MICROSECONDS : SS_BROADCAST_WAIT_FIVE_SECONDS,
            (unsigned char)reqType);
        if (ret == DMS_SUCCESS) {
            break;
        }
        pg_usleep(5000L);
        count--;
    }

    t_thrd.postgres_cxt.whereToSendOutput = output_backup;
    return ret;
}

int SSLockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
{
    dms_context_t dms_ctx;
    InitDmsContext(&dms_ctx);
    SSBroadcastDDLLock ssmsg;
    ssmsg.type = BCAST_DDLLOCKRELEASE;
    errno_t rc = memcpy_s(&(ssmsg.locktag), sizeof(LOCKTAG), locktag, sizeof(LOCKTAG));
    securec_check(rc, "\0", "\0");
    ssmsg.lockmode = lockmode;
    ssmsg.sessionlock = sessionLock;
    ssmsg.dontWait = false;

    int output_backup = t_thrd.postgres_cxt.whereToSendOutput;
    t_thrd.postgres_cxt.whereToSendOutput = DestNone;
    int ret = dms_broadcast_opengauss_ddllock(&dms_ctx, (char *)&ssmsg, sizeof(SSBroadcastDDLLock),
        (unsigned char)false, SS_BROADCAST_WAIT_FIVE_SECONDS, (unsigned char)LOCK_NORMAL_MODE);
    if (ret != DMS_SUCCESS) {
        ereport(WARNING, (errmsg("[SS broadcast] DDLLockRelease request failed!")));
    }

    t_thrd.postgres_cxt.whereToSendOutput = output_backup;
    return ret;
}

void SSLockReleaseAll()
{
    dms_context_t dms_ctx;
    InitDmsContext(&dms_ctx);
    SSBroadcastCmdOnly ssmsg;
    ssmsg.type = BCAST_DDLLOCKRELEASE_ALL;

    int output_backup = t_thrd.postgres_cxt.whereToSendOutput;
    t_thrd.postgres_cxt.whereToSendOutput = DestNone;
    int ret = dms_broadcast_opengauss_ddllock(&dms_ctx, (char *)&ssmsg, sizeof(SSBroadcastCmdOnly),
        (unsigned char)false, SS_BROADCAST_WAIT_FIVE_SECONDS, (unsigned char)LOCK_RELEASE_SELF);
    if (ret != DMS_SUCCESS) {
        ereport(DEBUG1, (errmsg("[SS broadcast] DDLLockReleaseAll request failed!")));
    }

    t_thrd.postgres_cxt.whereToSendOutput = output_backup;
}

void SSLockAcquireAll()
{
    PROCLOCK *proclock = NULL;
    HASH_SEQ_STATUS seqstat;
    int i;
    for (i = 0; i < NUM_LOCK_PARTITIONS; i++) {
        (void)LWLockAcquire(GetMainLWLockByIndex(FirstLockMgrLock + i), LW_SHARED);
    }

    hash_seq_init(&seqstat, t_thrd.storage_cxt.LockMethodProcLockHash);
    while ((proclock = (PROCLOCK *)hash_seq_search(&seqstat))) {
        if ((proclock->tag.myLock->tag.locktag_type < (uint8)LOCKTAG_PAGE ||
            proclock->tag.myLock->tag.locktag_type == (uint8)LOCKTAG_OBJECT) &&
            (proclock->holdMask & LOCKBIT_ON(AccessExclusiveLock))) {
            LOCK *lock = proclock->tag.myLock;
            int ret = SSLockAcquire(&(lock->tag), AccessExclusiveLock, false, false, LOCK_REACQUIRE);
            if (ret) {
                ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS Broadcast] LockAcquire when reform finished failed")));
            }
        }
    }

    for (i = NUM_LOCK_PARTITIONS; --i >= 0;) {
        LWLockRelease(GetMainLWLockByIndex(FirstLockMgrLock + i));
    }
}

void SSCheckBufferIfNeedMarkDirty(Buffer buf)
{
    if (IsExtremeRedo()) {
        return;
    }

    dms_buf_ctrl_t* buf_ctrl = GetDmsBufCtrl(buf - 1);
    if (buf_ctrl->state & BUF_DIRTY_NEED_FLUSH) {
        MarkBufferDirty(buf);
    }
}

void SSOndemandClearRedoDoneState()
{
    for (int buffer = 0; buffer < TOTAL_BUFFER_NUM; buffer++) {
        dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buffer);

        // ondemand redo finish, no need to mark
        if (buf_ctrl->state & BUF_ONDEMAND_REDO_DONE) {
            buf_ctrl->state &= ~BUF_ONDEMAND_REDO_DONE;
        }
    }
}

void SSRecheckBufferPool()
{
    uint64 buf_state;
    for (int i = 0; i < TOTAL_BUFFER_NUM; i++) {
        /*
         * BUF_DIRTY_NEED_FLUSH was removed during mark buffer dirty and lsn_on_disk was set during sync buffer
         * As BUF_DIRTY_NEED_FLUSH was set only if page lsn is bigger than ckpt redo, it should be removed at this time
         * Unfortunately if it is not, mark it dirty again. For lsn_on_disk, if it is still invalid, this means it is
         * not flushed.
         */
        BufferDesc *buf_desc = GetBufferDescriptor(i);
        pg_memory_barrier();
        buf_state = pg_atomic_read_u64(&buf_desc->state);
        if (!(buf_state & BM_VALID || buf_state & BM_TAG_VALID)) {
            continue;
        }

        dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(i);
        if (buf_ctrl->state & BUF_DIRTY_NEED_FLUSH) {
            XLogRecPtr pagelsn = BufferGetLSN(buf_desc);
            int mode = WARNING;
#ifdef USE_ASSERT_CHECKING
            mode = PANIC;
#endif
            ereport(mode,
                (errmsg("[SS][%u/%u/%u/%d %d-%u] Buffer was not flushed or replayed, page lsn (0x%llx)",
                buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
                buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum, (unsigned long long)pagelsn)));
        }
    }
}

bool CheckPageNeedSkipInRecovery(Buffer buf, uint64 xlogLsn)
{
    bool skip = false;
    dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf - 1);
    if (buf_ctrl->lock_mode == DMS_LOCK_EXCLUSIVE || !(buf_ctrl->state & BUF_DIRTY_NEED_FLUSH)) {
        return skip;
    }

    BufferDesc* buf_desc = GetBufferDescriptor(buf - 1);
    char pageid[DMS_PAGEID_SIZE];
    errno_t err = memcpy_s(pageid, DMS_PAGEID_SIZE, &(buf_desc->tag), sizeof(BufferTag));
    securec_check(err, "\0", "\0");
    int ret = dms_recovery_page_need_skip(pageid, xlogLsn, (unsigned char *)&skip);
    if (ret != DMS_SUCCESS) {
        ereport(PANIC, (errmsg("[SS] DMS Internal error happened during recovery, errno %d", ret)));
    }

    return skip;
}

dms_session_e DMSGetProcType4RequestPage()
{
    // proc type used in DMS request page
    if (AmDmsReformProcProcess() || ((AmPageRedoProcess() || AmStartupProcess()) && !SS_ONDEMAND_BUILD_DONE)) {
        /* When SS double cluster, main standby always is in recovery.
         * When pmState is PM_HOT_STANDBY, this case indicates main standby support to read only. So here
         * DMS_SESSION_RECOVER_HOT_STANDBY will be returned, it indicates that normal threads can access
         * page in recovery state.
         */
        if (SS_DISASTER_MAIN_STANDBY_NODE && pmState == PM_HOT_STANDBY) {
            return DMS_SESSION_RECOVER_HOT_STANDBY;
        } else {
            return DMS_SESSION_RECOVER;
        }
    } else {
        return DMS_SESSION_NORMAL;
    }
}

bool SSPageCheckIfCanEliminate(BufferDesc* buf_desc, uint64 flags)
{
    if (!ENABLE_DMS) {
        return true;
    }

    if (ENABLE_DSS_AIO && buf_desc->extra->aio_in_progress) {
        return false;
    }

    /** this page produced in flush_copy phase, should not eliminate and mark dirty later
     *  when mark dirty: replay xlog
     *  why not use SS_IN_FLUSHCOPY to judge
     *      in recovery phase, when need to eliminate page, this page with BUF_DIRTY_NEED_FLUSH flag still can be found
     */
    dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
    if (flags & BM_VALID && buf_ctrl->state & BUF_DIRTY_NEED_FLUSH) {
        return false;
    }
    return true;
}

bool SSSegRead(SMgrRelation reln, ForkNumber forknum, char *buffer)
{
    Buffer buf = BlockGetBuffer(buffer);
    if (BufferIsInvalid(buf)) {
        return false;
    }

    BufferDesc *buf_desc = BufferGetBufferDescriptor(buf);
    bool ret = false;

    if ((pg_atomic_read_u64(&buf_desc->state) & BM_VALID) && buf_desc->extra->seg_fileno != EXTENT_INVALID) {
        SMGR_READ_STATUS rdStatus;
        if (reln->seg_space == NULL) {
            reln->seg_space = spc_open(reln->smgr_rnode.node.spcNode, reln->smgr_rnode.node.dbNode, false);
        }

        SegmentCheck(reln->seg_space);
        RelFileNode fakenode = {
            .spcNode = reln->smgr_rnode.node.spcNode,
            .dbNode = reln->smgr_rnode.node.dbNode,
            .relNode = buf_desc->extra->seg_fileno,
            .bucketNode = SegmentBktId,
            .opt = 0
        };

        /* Check whether the physical location info match! */
        dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf - 1);
        if (buf_ctrl->seg_fileno != EXTENT_INVALID && (buf_ctrl->seg_fileno != buf_desc->extra->seg_fileno ||
            buf_ctrl->seg_blockno != buf_desc->extra->seg_blockno)) {
            ereport(PANIC, (errmsg("[SS] It seemd physical location of drc not match with buf desc!")));
        }

        seg_physical_read(reln->seg_space, fakenode, forknum, buf_desc->extra->seg_blockno, (char *)buffer);
        if (PageIsVerified((Page)buffer, buf_desc->extra->seg_blockno)) {
            rdStatus = SMGR_RD_OK;
        } else {
            rdStatus = SMGR_RD_CRC_ERROR;
        }

        if (rdStatus == SMGR_RD_OK) {
            ret = true;
        }
    }

    return ret;
}

bool DmsCheckBufAccessible()
{
    if (dms_drc_accessible((uint8)DRC_RES_PAGE_TYPE) || DMSGetProcType4RequestPage() == DMS_SESSION_RECOVER ||
        OndemandAllowBufAccess()) {
        return true;
    }
    return false;
}

bool SSTryFlushBuffer(BufferDesc *buf)
{
    //copy from BufferAlloc
    if (!backend_can_flush_dirty_page()) {
        return false;
    }

    if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED)) {
        if (dw_enabled() && pg_atomic_read_u32(&g_instance.ckpt_cxt_ctl->current_page_writer_count) > 0) {
            if (!free_space_enough(buf->buf_id)) {
                LWLockRelease(buf->content_lock);
                return false;
            }
            uint32 pos = 0;
            pos = first_version_dw_single_flush(buf);
            t_thrd.proc->dw_pos = pos;
            FlushBuffer(buf, NULL);
            g_instance.dw_single_cxt.single_flush_state[pos] = true;
            t_thrd.proc->dw_pos = -1;
        } else {
            FlushBuffer(buf, NULL);
        }
        LWLockRelease(buf->content_lock);
        ScheduleBufferTagForWriteback(t_thrd.storage_cxt.BackendWritebackContext, &buf->tag);
        return true;    
    }
    return false;
}

bool SSTrySegFlushBuffer(BufferDesc* buf)
{
    //copy from SegBufferAlloc
    if (!backend_can_flush_dirty_page()) {
        return false;
    }

    if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED)) {
        FlushOneSegmentBuffer(buf->buf_id + 1);
        LWLockRelease(buf->content_lock);
        ScheduleBufferTagForWriteback(t_thrd.storage_cxt.BackendWritebackContext, &buf->tag);
        return true;
    } 
    return false;
}

/** true :1)this buffer dms think need flush, and flush success
*         2) no need flush 
*   false: this flush dms think need flush, but cannot flush 
*/ 
bool SSHelpFlushBufferIfNeed(BufferDesc* buf_desc)
{
    if (!ENABLE_DMS || IsInitdb) {
        return true;
    }

    if (ENABLE_DSS_AIO && buf_desc->extra->aio_in_progress) {
        return false;
    }

    dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
    if (buf_ctrl->state & BUF_DIRTY_NEED_FLUSH) {
        // wait dw_init finish
        while (!g_instance.dms_cxt.dw_init) {
            pg_usleep(1000L);
        }

        XLogRecPtr pagelsn = BufferGetLSN(buf_desc);
        if (!SS_IN_REFORM && !SS_IN_ONDEMAND_RECOVERY) {
            ereport(PANIC,
                (errmsg("[SS][%u/%u/%u/%d %d-%u] this buffer should not exist with BUF_DIRTY_NEED_FLUSH "
                "but not in reform, page lsn (0x%llx), seg info:%u-%u",
                buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode, 
                buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum, 
                (unsigned long long)pagelsn, (unsigned int)buf_desc->extra->seg_fileno,
                buf_desc->extra->seg_blockno)));
        }
        bool in_flush_copy = SS_IN_FLUSHCOPY;
        bool in_recovery = !g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag;
        ereport(LOG,
            (errmsg("[SS][%u/%u/%u/%d %d-%u] flush copy: ready to flush buffer with need flush, "
             "page lsn (0x%llx), seg info:%u-%u, reform phase is in flush_copy:%d, in recovery:%d ",
             buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode, 
             buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum, 
             (unsigned long long)pagelsn, (unsigned int)buf_desc->extra->seg_fileno, buf_desc->extra->seg_blockno,
             in_flush_copy, in_recovery)));
        if (IsSegmentBufferID(buf_desc->buf_id)) {
            return SSTrySegFlushBuffer(buf_desc);
        } else {
            return SSTryFlushBuffer(buf_desc);
        }
    }
    return true;
}

void SSMarkBufferDirtyForERTO(RedoBufferInfo* bufferinfo)
{
    if (!ENABLE_DMS || bufferinfo->pageinfo.page == NULL) {
        return;
    }

    /* For buffer need flush, we need to mark dirty here */
    if (!IsRedoBufferDirty(bufferinfo)) {
        dms_buf_ctrl_t* buf_ctrl = GetDmsBufCtrl(bufferinfo->buf - 1);
        BufferDesc *bufDesc = GetBufferDescriptor(bufferinfo->buf - 1);
        if (buf_ctrl->state & BUF_ERTO_NEED_MARK_DIRTY) {
            MakeRedoBufferDirty(bufferinfo);
        } else if ((buf_ctrl->state & BUF_DIRTY_NEED_FLUSH) || 
                CheckPageNeedSkipInRecovery(bufferinfo->buf, bufferinfo->lsn) ||
                XLogRecPtrIsInvalid(bufDesc->extra->lsn_on_disk)) {
            buf_ctrl->state |= BUF_ERTO_NEED_MARK_DIRTY;
            MakeRedoBufferDirty(bufferinfo);
        }
    }
}

bool SSLWLockAcquireTimeout(LWLock* lock, LWLockMode mode)
{
    bool get_lock = false;
    int wait_tickets = (SS_PRIMARY_MODE) ? 500 : 5000;

    get_lock = LWLockAcquireTimeout(lock, mode, wait_tickets);
    if (!get_lock) {
        ereport(WARNING, (errcode(MOD_DMS), (errmsg("[SS lwlock] request LWLock:%p timeout, LWLockMode:%d, timeout:%dms",
            lock, mode, wait_tickets))));
    }
    return get_lock;
}

bool SSWaitIOTimeout(BufferDesc *buf)
{
    bool ret = false;
    for (;;) {
        uint64 buf_state;
        buf_state = LockBufHdr(buf);
        UnlockBufHdr(buf, buf_state);

        if (!(buf_state & BM_IO_IN_PROGRESS)) {
            ret = true;
            break;
        }
        ret = SSLWLockAcquireTimeout(buf->io_in_progress_lock, LW_SHARED);
        if (ret) {
            LWLockRelease(buf->io_in_progress_lock);
        } else {
            break;
        }
    }

    if (!ret) {
        BufferTag *tag = &buf->tag;
        ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] SSWaitIOTimeout, "
            "buf_id:%d, io_in_progress_lock:%p",
            tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
            tag->forkNum, tag->blockNum, buf->buf_id, buf->io_in_progress_lock))));
    }
    return ret;
}

Buffer SSReadBuffer(BufferTag *tag, ReadBufferMode mode)
{
    Buffer buffer;
    if (IsSegmentPhysicalRelNode(tag->rnode)) {
        SegSpace *spc = spc_open(tag->rnode.spcNode, tag->rnode.dbNode, false, false);
        buffer = ReadBufferFast(spc, tag->rnode, tag->forkNum, tag->blockNum, mode);
    } else {
        buffer = ReadBufferWithoutRelcache(tag->rnode, tag->forkNum, tag->blockNum, mode, NULL, NULL);
    }
    return buffer;
}

void DmsReleaseBuffer(int buffer, bool is_seg)
{
    if (is_seg) {
        SegReleaseBuffer(buffer);
    } else {
        ReleaseBuffer(buffer);
    }
}

bool SSNeedTerminateRequestPageInReform(dms_buf_ctrl_t *buf_ctrl)
{
    if (AmDmsReformProcProcess() && dms_reform_failed()) {
        return true;
    }

    if ((AmPageRedoProcess() || AmStartupProcess()) && dms_reform_failed()) {
        return true;
    }
    return false;
}

bool SSNeedTerminateRequestPageInPrimaryRestart(BufferDesc *buf_desc)
{
    if (!SS_AM_BACKENDS_WORKERS || !SS_STANDBY_IN_PRIMARY_RESTART ||
        !IsSegmentPhysicalRelNode(buf_desc->tag.rnode) || DmsCheckBufAccessible()) {
        return false;
    }

    for (int i = 0; i < t_thrd.storage_cxt.num_held_lwlocks; i++) {
        if (t_thrd.storage_cxt.held_lwlocks[i].lock != NULL &&
            t_thrd.storage_cxt.held_lwlocks[i].lock->tranche == LWTRANCHE_BUFFER_IO_IN_PROGRESS &&
            t_thrd.storage_cxt.held_lwlocks[i].lock->tag != buf_desc->buf_id &&
            t_thrd.storage_cxt.held_lwlocks[i].mode == LW_EXCLUSIVE) {
            BufferDesc *tmpDesc = BufferGetBufferDescriptor(t_thrd.storage_cxt.held_lwlocks[i].lock->tag + 1);
            if (IsSegmentFileNode(tmpDesc->tag.rnode) &&
                !IsSegmentPhysicalRelNode(tmpDesc->tag.rnode)) {
                ereport(LOG, (errmodule(MOD_DMS),
                    (errmsg("[SS][%u/%u/%u/%d %d-%u] Backend need to terminate request page during primary node"
                            " restart, buf_id:%d.",
                            buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode,
                            buf_desc->tag.rnode.relNode, buf_desc->tag.rnode.bucketNode,
                            buf_desc->tag.forkNum, buf_desc->tag.blockNum, buf_desc->buf_id))));
                return true;
            }
        }
    }

    return false;
}

bool SSPinBuffer(BufferDesc *buf_desc)
{
    if (IsSegmentBufferID(buf_desc->buf_id)) {
        return SegPinBuffer(buf_desc);
    } else {
        ResourceOwnerEnlargeBuffers(t_thrd.utils_cxt.CurrentResourceOwner);
        return PinBuffer(buf_desc, NULL);
    }
}

void SSUnPinBuffer(BufferDesc *buf_desc)
{
    if (IsSegmentBufferID(buf_desc->buf_id)) {
        SegUnpinBuffer(buf_desc);
    } else {
        UnpinBuffer(buf_desc, true);
    }
}

/*
 * Check whether the page got from SS standby node was pruned by master node.
 * Compare the saved offset with latest offset of same linepointer.
 * Note: the function need to be removed after ustore is used in kernel.
 */
static bool LineOffChanged(Buffer buf, int lp_offset, uint16 saved_off)
{
    Page page = BufferGetPage(buf + 1);
    ItemId lpp = HeapPageGetItemId(page, lp_offset);
    if (lpp->lp_off != saved_off) {
#ifdef USE_ASSERT_CHECKING
        int output_backup = t_thrd.postgres_cxt.whereToSendOutput;
        t_thrd.postgres_cxt.whereToSendOutput = DestNone;
        BufferDesc *buf_desc = GetBufferDescriptor(buf);
        ereport(DEBUG1, (errmsg("[%d/%d/%d/%d/%d %d-%d] lineoffchanged new %d/%d/%d old %d!",
            buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
            (int)buf_desc->tag.rnode.bucketNode, (int)buf_desc->tag.rnode.opt, buf_desc->tag.forkNum,
            buf_desc->tag.blockNum, lpp->lp_off, lpp->lp_flags, lpp->lp_len, saved_off)));
        t_thrd.postgres_cxt.whereToSendOutput = output_backup;
#endif
        return true;
    }
    return false;
}

/*
 * Check whether the page got from SS standby node was pruned by master node.
 * Note: the function need to be removed after ustore is used in kernel.
 */
void ForgetBufferNeedCheckPin(Buffer buf_id)
{
    if (t_thrd.dms_cxt.pincount_array == NULL) {
        return;
    }

    for (int i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++) {
        if (t_thrd.dms_cxt.pincount_array[i].bufid == (uint32)buf_id) {
            t_thrd.dms_cxt.pincount_array[i].bufid = InvalidBuffer;
            dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_id - 1);
            if (pg_atomic_read_u32(&buf_ctrl->pinned_count) > 0) {
                uint32 count = pg_atomic_sub_fetch_u32(&(buf_ctrl->pinned_count), 1);
#ifdef USE_ASSERT_CHECKING
                BufferDesc *buf_desc = GetBufferDescriptor(buf_id - 1);
                int output_backup = t_thrd.postgres_cxt.whereToSendOutput;
                t_thrd.postgres_cxt.whereToSendOutput = DestNone;
                ereport(DEBUG1, (errmsg("[%d/%d/%d/%d/%d %d-%d] ForgetBufferNeedCheckPin %d!",
                    buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
                    (int)buf_desc->tag.rnode.bucketNode, (int)buf_desc->tag.rnode.opt, buf_desc->tag.forkNum,
                    buf_desc->tag.blockNum, count)));
                t_thrd.postgres_cxt.whereToSendOutput = output_backup;
#endif
                if (t_thrd.dms_cxt.need_check_pincount &&
                    LineOffChanged(buf_id - 1, t_thrd.dms_cxt.pincount_array[i].lp_offset,
                                   t_thrd.dms_cxt.pincount_array[i].saved_off)) {
                    if (buf_ctrl->need_check_pincount) {
                        if (pg_atomic_read_u32(&(buf_ctrl->pinned_count)) == 0) {
                            buf_ctrl->need_check_pincount = false;
                        }
                        t_thrd.dms_cxt.need_check_pincount = false;
                        t_thrd.dms_cxt.pincount_array[i].lp_offset = 0;
                        t_thrd.dms_cxt.pincount_array[i].saved_off = 0;
                        ereport(ERROR, (errmsg("The version of page is too old, please requery!")));
                    }
                }
            }
            t_thrd.dms_cxt.pincount_array[i].lp_offset = 0;
            t_thrd.dms_cxt.pincount_array[i].saved_off = 0;
            break;
        }
    }
    t_thrd.dms_cxt.need_check_pincount = false;
}