/*
 * Copyright (c) 2022 Huawei Technologies Co.,Ltd.
 *
 * DMS is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * dcs_tran.c
 *
 *
 * IDENTIFICATION
 *    src/dcs/dcs_tran.c
 *
 * -------------------------------------------------------------------------
 */

#include "dcs_tran.h"
#include "dcs_msg.h"
#include "dms.h"
#include "dms_msg.h"
#include "dms_msg_command.h"
#include "dms_msg_protocol.h"
#include "drc.h"
#include "drc_tran.h"
#include "dms_stat.h"
#include "dms_error.h"

int dms_request_opengauss_txn_status(dms_context_t *dms_ctx, unsigned char request, unsigned char *result)
{
    dms_reset_error();
    msg_opengauss_txn_status_request_t status_req;
    dms_message_head_t *head = &status_req.head;
    dms_xid_ctx_t *xid_ctx = &dms_ctx->xid_ctx;
    dms_message_t receive_msg = { 0 };

    DMS_INIT_MESSAGE_HEAD(head, MSG_REQ_OPENGAUSS_TXN_STATUS, 0, dms_ctx->inst_id, xid_ctx->inst_id,
        (uint16)dms_ctx->sess_id, CM_INVALID_ID16);
    status_req.xid = xid_ctx->xid;
    status_req.request_type = request;

    head->size = (uint16)sizeof(msg_opengauss_txn_status_request_t);

    // openGauss has not adapted stats yet
    dms_begin_stat(dms_ctx->sess_id, DMS_EVT_TXN_REQ_INFO, CM_TRUE);

    int32 ret = mfc_send_data(head);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);
        LOG_DEBUG_ERR("[TXN] send message to instance(%u) failed, cmd(%u) ruid(%llu) errcode(%u)",
            (uint32)xid_ctx->inst_id, (uint32)MSG_REQ_OPENGAUSS_TXN_STATUS, head->ruid, (uint32)ret);
        return ret;
    }

    ret = mfc_get_response(head->ruid, &receive_msg, DMS_WAIT_MAX_TIME);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);
        LOG_DEBUG_ERR("[TXN] receive message to instance(%u) failed, cmd(%u) ruid(%llu) errcode(%u)",
            (uint32)xid_ctx->inst_id, (uint32)MSG_REQ_OPENGAUSS_TXN_STATUS, head->ruid, (uint32)ret);
        DMS_RETURN_IF_PROTOCOL_COMPATIBILITY_ERROR(ret);
        return ret;
    }

    dms_end_stat(dms_ctx->sess_id);

    CM_CHK_RESPONSE_SIZE(&receive_msg, (uint32)(sizeof(dms_message_head_t) + sizeof(bool8)), CM_FALSE);
    *result = *(bool8 *)(receive_msg.buffer + sizeof(dms_message_head_t));
    mfc_release_response(&receive_msg);
    return DMS_SUCCESS;
}

void dcs_proc_opengauss_txn_status_req(dms_process_context_t *process_ctx, dms_message_t *receive_msg)
{
    dms_message_head_t *req_head = receive_msg->head;
    dms_message_head_t ack_head;

    CM_CHK_PROC_MSG_SIZE_NO_ERR(receive_msg, (uint32)sizeof(msg_opengauss_txn_status_request_t), CM_TRUE);
    msg_opengauss_txn_status_request_t *status_req = (msg_opengauss_txn_status_request_t *)(receive_msg->buffer);

    uint64 xid = status_req->xid;
    unsigned char req_type = status_req->request_type;
    bool8 result;

    int ret = g_dms.callback.get_opengauss_txn_status(process_ctx->db_handle, xid, req_type, &result);
    if (ret != DMS_SUCCESS) {
        DMS_THROW_ERROR(ERRNO_DMS_DCS_GET_TXN_STATUS_FAILED, ret);
        cm_ack_result_msg(process_ctx, receive_msg, MSG_ACK_ERROR, ret);
        return;
    }

    dms_init_ack_head2(&ack_head, MSG_ACK_OPENGAUSS_TXN_STATUS, 0, req_head->dst_inst, req_head->src_inst,
        (uint16)process_ctx->sess_id, req_head->src_sid, req_head->msg_proto_ver);
    ack_head.size = (uint16)(sizeof(uint64) + sizeof(dms_message_head_t));
    ack_head.ruid = req_head->ruid;

    if (mfc_send_data3(&ack_head, sizeof(dms_message_head_t), &result) != CM_SUCCESS) {
        LOG_DEBUG_ERR("[TXN] send openGauss txn status ack message failed, src_inst = %u, dst_inst = %u",
            (uint32)ack_head.src_inst, (uint32)ack_head.dst_inst);
    }
}

int dms_request_opengauss_update_xid(dms_context_t *dms_ctx, unsigned short t_infomask,
    unsigned short t_infomask2, unsigned long long *uxid)
{
    dms_reset_error();
    msg_opengauss_update_xid_request_t uxid_req;
    dms_message_head_t *head = &uxid_req.head;
    dms_xid_ctx_t *xid_ctx = &dms_ctx->xid_ctx;
    dms_message_t receive_msg = { 0 };

    DMS_INIT_MESSAGE_HEAD(head, MSG_REQ_OPENGAUSS_TXN_UPDATE_XID, 0, dms_ctx->inst_id, xid_ctx->inst_id,
        (uint16)dms_ctx->sess_id, CM_INVALID_ID16);
    uxid_req.xid = xid_ctx->xid;
    uxid_req.t_infomask = t_infomask;
    uxid_req.t_infomask2 = t_infomask2;

    head->size = (uint16)sizeof(msg_opengauss_update_xid_request_t);

    // openGauss has not adapted stats yet
    dms_begin_stat(dms_ctx->sess_id, DMS_EVT_TXN_REQ_INFO, CM_TRUE);

    int32 ret = mfc_send_data(head);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);
        LOG_DEBUG_ERR("[TXN] send message to instance(%u) failed, cmd(%u) ruid(%llu) errcode(%u)",
            (uint32)xid_ctx->inst_id, (uint32)MSG_REQ_OPENGAUSS_TXN_UPDATE_XID, head->ruid, (uint32)ret);
        return ret;
    }

    ret = mfc_get_response(head->ruid, &receive_msg, DMS_WAIT_MAX_TIME);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);
        LOG_DEBUG_ERR("[TXN] receive message to instance(%u) failed, cmd(%u) ruid(%llu) errcode(%u)",
            (uint32)xid_ctx->inst_id, (uint32)MSG_REQ_OPENGAUSS_TXN_UPDATE_XID, head->ruid, (uint32)ret);
        return ret;
    }

    dms_end_stat(dms_ctx->sess_id);

    CM_CHK_RESPONSE_SIZE(&receive_msg, (uint32)(sizeof(dms_message_head_t) + sizeof(uint64)), CM_FALSE);
    *uxid = *(uint64 *)(receive_msg.buffer + sizeof(dms_message_head_t));
    mfc_release_response(&receive_msg);
    return DMS_SUCCESS;
}

void dcs_proc_opengauss_update_xid_req(dms_process_context_t *process_ctx, dms_message_t *receive_msg)
{
    dms_message_head_t *req_head = receive_msg->head;
    dms_message_head_t ack_head;

    CM_CHK_PROC_MSG_SIZE_NO_ERR(receive_msg, (uint32)sizeof(msg_opengauss_update_xid_request_t), CM_TRUE);
    msg_opengauss_update_xid_request_t *uxid_req = (msg_opengauss_update_xid_request_t *)(receive_msg->buffer);

    uint64 uxid;
    uint64 xid = uxid_req->xid;
    uint16 t_infomask = uxid_req->t_infomask;
    uint16 t_infomask2 = uxid_req->t_infomask2;

    int ret = g_dms.callback.get_opengauss_update_xid(process_ctx->db_handle, xid, t_infomask, t_infomask2, &uxid);
    if (ret != DMS_SUCCESS) {
        DMS_THROW_ERROR(ERRNO_DMS_DCS_GET_UPDATE_XID_FAILED, ret);
        cm_ack_result_msg(process_ctx, receive_msg, MSG_ACK_ERROR, ret);
        return;
    }

    dms_init_ack_head2(&ack_head, MSG_ACK_OPENGAUSS_TXN_UPDATE_XID, 0, req_head->dst_inst, req_head->src_inst,
        (uint16)process_ctx->sess_id, req_head->src_sid, req_head->msg_proto_ver);
    ack_head.size = (uint16)(sizeof(uint64) + sizeof(dms_message_head_t));
    ack_head.ruid = req_head->ruid;

    if (mfc_send_data3(&ack_head, sizeof(dms_message_head_t), &uxid) != CM_SUCCESS) {
        LOG_DEBUG_ERR("[TXN] send openGauss txn update xid ack message failed, src_inst = %u, dst_inst = %u",
            (uint32)ack_head.src_inst, (uint32)ack_head.dst_inst);
    }
}

int dms_request_opengauss_xid_csn(dms_context_t *dms_ctx, dms_opengauss_xid_csn_t *dms_txn_info,
    dms_opengauss_csn_result_t *xid_csn_result)
{
    dms_reset_error();
    msg_opengauss_xid_csn_request_t xid_csn_req;
    dms_message_head_t *head = &xid_csn_req.head;
    dms_xid_ctx_t *xid_ctx = &dms_ctx->xid_ctx;
    dms_message_t receive_msg = { 0 };

    DMS_INIT_MESSAGE_HEAD(head, MSG_REQ_OPENGAUSS_XID_CSN, 0, (uint8)dms_ctx->inst_id, (uint8)xid_ctx->inst_id,
        (uint16)dms_ctx->sess_id, CM_INVALID_ID16);
    xid_csn_req.xid_csn_ctx = *dms_txn_info;
    head->size = (uint16)sizeof(msg_opengauss_xid_csn_request_t);

    dms_begin_stat(dms_ctx->sess_id, DMS_EVT_TXN_REQ_INFO, CM_TRUE);

    int32 ret = mfc_send_data(head);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);

        LOG_DEBUG_ERR("[TXN] send message to instance(%hhu) failed, cmd(%d) ruid(%llu) errcode(%d)",
            xid_ctx->inst_id, MSG_REQ_OPENGAUSS_XID_CSN, head->ruid, ret);
        return ret;
    }

    ret = mfc_get_response(head->ruid, &receive_msg, DMS_WAIT_MAX_TIME);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);

        LOG_DEBUG_ERR("[TXN] receive message to instance(%hhu) failed, cmd(%d) ruid(%llu) errcode(%d)",
            xid_ctx->inst_id, MSG_REQ_OPENGAUSS_XID_CSN, head->ruid, ret);
        return ret;
    }

    dms_end_stat(dms_ctx->sess_id);

    CM_CHK_RESPONSE_SIZE(&receive_msg,
        (uint32)(sizeof(dms_message_head_t) + sizeof(dms_opengauss_csn_result_t)), CM_FALSE);
    errno_t err = memcpy_s(xid_csn_result, sizeof(dms_opengauss_csn_result_t),
        (receive_msg.buffer + sizeof(dms_message_head_t)), sizeof(dms_opengauss_csn_result_t));
    if (err != EOK) {
        LOG_DEBUG_ERR("[TXN] memcpy_s failed, errno = %d", err);
        DMS_THROW_ERROR(ERRNO_DMS_SECUREC_CHECK_FAIL);
        mfc_release_response(&receive_msg);
        return ERRNO_DMS_SECUREC_CHECK_FAIL;
    }

    mfc_release_response(&receive_msg);
    return DMS_SUCCESS;
}

void dcs_proc_opengauss_xid_csn_req(dms_process_context_t *process_ctx, dms_message_t *receive_msg)
{
    dms_message_head_t *req_head = receive_msg->head;
    dms_message_head_t ack_head;

    CM_CHK_PROC_MSG_SIZE_NO_ERR(receive_msg, (uint32)sizeof(msg_opengauss_xid_csn_request_t), CM_TRUE);
    msg_opengauss_xid_csn_request_t *xid_csn_req = (msg_opengauss_xid_csn_request_t *)(receive_msg->buffer);
    dms_opengauss_xid_csn_t xid_csn = xid_csn_req->xid_csn_ctx;
    dms_opengauss_csn_result_t csn_result = { 0 };

    int ret = g_dms.callback.get_opengauss_xid_csn(process_ctx->db_handle, &xid_csn, &csn_result);
    if (ret != DMS_SUCCESS) {
        DMS_THROW_ERROR(ERRNO_DMS_DCS_GET_XID_CSN_FAILED, ret);
        cm_ack_result_msg(process_ctx, receive_msg, MSG_ACK_ERROR, ret);
        return;
    }

    dms_init_ack_head2(&ack_head, MSG_ACK_OPENGAUSS_XID_CSN, 0, req_head->dst_inst, req_head->src_inst,
        (uint16)process_ctx->sess_id, req_head->src_sid, req_head->msg_proto_ver);
    ack_head.size = (uint16)(sizeof(dms_opengauss_xid_csn_t) + sizeof(dms_message_head_t));
    ack_head.ruid = req_head->ruid;

    if (mfc_send_data3(&ack_head, sizeof(dms_message_head_t), &csn_result) != CM_SUCCESS) {
        LOG_DEBUG_ERR("[TXN] send openGauss xid csn ack message failed, src_inst = %u, dst_inst = %u",
            (uint32)ack_head.src_inst, (uint32)ack_head.dst_inst);
    }
}

void dcs_proc_txn_info_req(dms_process_context_t *process_ctx, dms_message_t *receive_msg)
{
#ifdef OPENGAUSS
    /* pass */
#else
    dms_message_head_t *req_head = receive_msg->head;
    dms_message_head_t ack_head;
    dms_txn_info_t txn_info = { 0 };

    uint32 total_size = (uint32)(sizeof(msg_txn_info_request_t));
    CM_CHK_PROC_MSG_SIZE_NO_ERR(receive_msg, total_size, CM_FALSE);
    msg_txn_info_request_t *txn_info_req = (msg_txn_info_request_t *)(receive_msg->buffer);

    /* sync SCN */
    if (process_ctx->db_handle != NULL) {
        g_dms.callback.update_global_scn(process_ctx->db_handle, txn_info_req->scn);
    }

    int ret = g_dms.callback.get_txn_info(process_ctx->db_handle, txn_info_req->xid,
        (bool8)txn_info_req->is_scan, &txn_info);
    if (ret != DMS_SUCCESS) {
        cm_send_error_msg(req_head, ERRNO_DMS_CALLBACK_GET_TXN_INFO, "get txn info failed");
        return;
    }

    dms_init_ack_head2(&ack_head, MSG_ACK_TXN_INFO, 0, req_head->dst_inst, req_head->src_inst,
        (uint16)process_ctx->sess_id, req_head->src_sid, req_head->msg_proto_ver);
    ack_head.size = (uint16)(sizeof(dms_txn_info_t) + sizeof(dms_message_head_t));
    ack_head.ruid = req_head->ruid;
    DDES_FAULT_INJECTION_CALL(DMS_FI_ACK_TXN_INFO, MSG_ACK_TXN_INFO);
    if (mfc_send_data3(&ack_head, sizeof(dms_message_head_t), &txn_info) != CM_SUCCESS) {
        LOG_DEBUG_ERR("[TXN] send txn info ack message failed, src_inst = %u, dst_inst = %u",
            (uint32)ack_head.src_inst, (uint32)ack_head.dst_inst);
    }
#endif
}

int dms_request_txn_info(dms_context_t *dms_ctx, dms_txn_info_t *dms_txn_info)
{
    dms_reset_error();
    msg_txn_info_request_t txn_info_req;
    dms_message_head_t *head = &txn_info_req.head;
    dms_xid_ctx_t *xid_ctx = &dms_ctx->xid_ctx;
    dms_message_t receive_msg = { 0 };

    DMS_INIT_MESSAGE_HEAD(head, MSG_REQ_TXN_INFO, 0, dms_ctx->inst_id, xid_ctx->inst_id,
        (uint16)dms_ctx->sess_id, CM_INVALID_ID16);
    txn_info_req.xid = xid_ctx->xid;
#ifndef OPENGAUSS
    /* sync SCN to remote instance */
    txn_info_req.scn = (dms_ctx->db_handle != NULL) ? g_dms.callback.get_global_scn(dms_ctx->db_handle) : 0;
#endif
    txn_info_req.is_scan = xid_ctx->is_scan;
    head->size = (uint16)sizeof(txn_info_req);

    dms_begin_stat(dms_ctx->sess_id, DMS_EVT_TXN_REQ_INFO, CM_TRUE);
    DDES_FAULT_INJECTION_CALL(DMS_FI_REQ_TXN_INFO, MSG_REQ_TXN_INFO);
    int32 ret = mfc_send_data(head);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);

        LOG_DEBUG_ERR("[TXN] send message to instance(%u) failed, cmd(%u) ruid(%llu) errcode(%d)",
            (uint32)xid_ctx->inst_id, (uint32)MSG_REQ_TXN_INFO, head->ruid, ret);
        DMS_THROW_ERROR(ERRNO_DMS_SEND_MSG_FAILED, ret, MSG_REQ_TXN_INFO, xid_ctx->inst_id);
        return ERRNO_DMS_SEND_MSG_FAILED;
    }

    ret = mfc_get_response(head->ruid, &receive_msg, DMS_WAIT_MAX_TIME);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);

        LOG_DEBUG_ERR("[TXN] receive message from instance(%u) failed, cmd(%u) ruid(%llu) errcode(%d)",
            (uint32)xid_ctx->inst_id, (uint32)MSG_REQ_TXN_INFO, head->ruid, ret);
        DMS_RETURN_IF_PROTOCOL_COMPATIBILITY_ERROR(ret);
        DMS_THROW_ERROR(ERRNO_DMS_RECV_MSG_FAILED, ret);
        return ERRNO_DMS_RECV_MSG_FAILED;
    }

    dms_end_stat(dms_ctx->sess_id);

    CM_CHK_RESPONSE_SIZE(&receive_msg,
        (uint32)(sizeof(dms_message_head_t) + sizeof(dms_txn_info_t)), CM_FALSE);
    errno_t err = memcpy_s(dms_txn_info, sizeof(dms_txn_info_t),
        (receive_msg.buffer + sizeof(dms_message_head_t)), sizeof(dms_txn_info_t));
    if (err != EOK) {
        LOG_DEBUG_ERR("[TXN] memcpy_s failed, errno = %d", err);
        DMS_THROW_ERROR(ERRNO_DMS_SECUREC_CHECK_FAIL);
        mfc_release_response(&receive_msg);
        return ERRNO_DMS_SECUREC_CHECK_FAIL;
    }

    mfc_release_response(&receive_msg);
    return DMS_SUCCESS;
}

void dcs_proc_opengauss_txn_snapshot_req(dms_process_context_t *process_ctx, dms_message_t *receive_msg)
{
    dms_message_head_t ack;
    dms_opengauss_txn_snapshot_t txn_snapshot = { 0 };

    uint8 src_inst = receive_msg->head->src_inst;
    int32 ret = g_dms.callback.get_opengauss_txn_snapshot(process_ctx->db_handle, &txn_snapshot, src_inst);
    if (ret == DMS_SUCCESS) {
        dms_init_ack_head2(&ack, MSG_ACK_OPENGAUSS_TXN_SNAPSHOT, 0, receive_msg->head->dst_inst,
            receive_msg->head->src_inst, (uint16)process_ctx->sess_id, receive_msg->head->src_sid,
            receive_msg->head->msg_proto_ver);
        ack.ruid = receive_msg->head->ruid;
        ack.size = (uint16)(sizeof(dms_message_head_t) + sizeof(dms_opengauss_txn_snapshot_t));
        (void)mfc_send_data3(&ack, sizeof(dms_message_head_t), &txn_snapshot);
    } else {
        cm_ack_result_msg(process_ctx, receive_msg, MSG_ACK_ERROR, ret);
    }
}

void dcs_proc_opengauss_txn_of_master_req(dms_process_context_t *process_ctx, dms_message_t *receive_msg)
{
#ifdef OPENGAUSS
    dms_opengauss_txn_sw_info_t dms_swinfo = { 0 };

    uint32 total_size = (uint32)(sizeof(dms_message_head_t) + sizeof(uint32));
    CM_CHK_PROC_MSG_SIZE_NO_ERR(receive_msg, total_size, CM_FALSE);
    dms_swinfo.server_proc_slot = *(uint32 *)(receive_msg->buffer + sizeof(dms_message_head_t));

    dms_message_head_t ack;
    int32 ret = g_dms.callback.get_opengauss_txn_of_master(process_ctx->db_handle, &dms_swinfo);
    if (ret == DMS_SUCCESS) {
        dms_init_ack_head2(&ack, MSG_ACK_OPENGAUSS_TXN_SWINFO, 0, receive_msg->head->dst_inst,
            receive_msg->head->src_inst, (uint16)process_ctx->sess_id, receive_msg->head->src_sid,
            receive_msg->head->msg_proto_ver);
        ack.ruid = receive_msg->head->ruid;
        ack.size = (uint16)(sizeof(dms_message_head_t) + sizeof(dms_opengauss_txn_sw_info_t));
        (void)mfc_send_data3(&ack, sizeof(dms_message_head_t), &dms_swinfo);
    } else {
        cm_ack_result_msg(process_ctx, receive_msg, MSG_ACK_ERROR, ret);
    }
#endif
}

void dcs_proc_txn_snapshot_req(dms_process_context_t *process_ctx, dms_message_t *receive_msg)
{
#ifdef OPENGAUSS
    /* pass */
#else
    dms_message_head_t ack;
    dms_txn_snapshot_t txn_snapshot = { 0 };

    CM_CHK_PROC_MSG_SIZE_NO_ERR(receive_msg, (uint32)sizeof(msg_txn_snapshot_t), CM_TRUE);
    msg_txn_snapshot_t *req = (msg_txn_snapshot_t *)receive_msg->buffer;
    uint32 xmap = req->xmap;
    int32 ret = g_dms.callback.get_txn_snapshot(process_ctx->db_handle, xmap, &txn_snapshot);
    if (ret == DMS_SUCCESS) {
        dms_init_ack_head2(&ack, MSG_ACK_TXN_SNAPSHOT, 0, receive_msg->head->dst_inst, receive_msg->head->src_inst,
            (uint16)process_ctx->sess_id, (uint16)receive_msg->head->src_sid, req->head.msg_proto_ver);
        ack.ruid = receive_msg->head->ruid;
        ack.size = (uint16)(sizeof(dms_message_head_t) + sizeof(dms_txn_snapshot_t));
        DDES_FAULT_INJECTION_CALL(DMS_FI_ACK_TXN_SNAPSHOT, MSG_ACK_TXN_SNAPSHOT);
        (void)mfc_send_data3(&ack, sizeof(dms_message_head_t), &txn_snapshot);
    } else {
        cm_ack_result_msg(process_ctx, receive_msg, MSG_ACK_ERROR, ret);
    }
#endif
}

int dms_request_opengauss_txn_snapshot(dms_context_t *dms_ctx, dms_opengauss_txn_snapshot_t *dms_txn_snapshot)
{
    dms_reset_error();
    dms_message_t dms_msg = { 0 };
    msg_opengauss_txn_snapshot_t req;
    dms_xmap_ctx_t *xmap_ctx = &dms_ctx->xmap_ctx;

    DMS_INIT_MESSAGE_HEAD(&req.head, MSG_REQ_OPENGAUSS_TXN_SNAPSHOT, 0, dms_ctx->inst_id,
        xmap_ctx->dest_id, dms_ctx->sess_id, CM_INVALID_ID16);
    req.head.size = (uint16)sizeof(msg_opengauss_txn_snapshot_t);

    dms_begin_stat(dms_ctx->sess_id, DMS_EVT_TXN_REQ_SNAPSHOT, CM_TRUE);

    int32 ret = mfc_send_data(&req.head);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);

        LOG_DEBUG_ERR("[TXN][request openGauss txn snapshot failed] src_inst %u src_sid %u dst_inst %u",
            dms_ctx->inst_id, dms_ctx->sess_id, xmap_ctx->dest_id);
        return ret;
    }

    ret = mfc_get_response(req.head.ruid, &dms_msg, DMS_WAIT_MAX_TIME);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);

        LOG_DEBUG_ERR("[TXN] receive message to instance(%u) failed, cmd(%u) ruid(%llu) errcode(%d)", xmap_ctx->dest_id,
            (uint32)MSG_REQ_OPENGAUSS_TXN_SNAPSHOT, req.head.ruid, ret);
        return ret;
    }

    dms_end_stat(dms_ctx->sess_id);

    if (dms_msg.head->cmd == MSG_ACK_OPENGAUSS_TXN_SNAPSHOT) {
        uint32 total_size = (uint32)(sizeof(dms_message_head_t) + sizeof(dms_opengauss_txn_snapshot_t));
        CM_CHK_RESPONSE_SIZE(&dms_msg, total_size, CM_FALSE);
        *dms_txn_snapshot = *(dms_opengauss_txn_snapshot_t *)(dms_msg.buffer + sizeof(dms_message_head_t));
        mfc_release_response(&dms_msg);
        return DMS_SUCCESS;
    } else {
        mfc_release_response(&dms_msg);
        DMS_THROW_ERROR(ERRNO_DMS_DCS_GET_TXN_SNAPSHOT_FAILED);
        return ERRNO_DMS_DCS_GET_TXN_SNAPSHOT_FAILED;
    }
}

int dms_request_opengauss_txn_of_master(dms_context_t *dms_ctx, dms_opengauss_txn_sw_info_t *dms_txn_swinfo)
{
    dms_reset_error();
    dms_message_t dms_msg = { 0 };
    msg_opengauss_txn_swinfo_t req;
    dms_xmap_ctx_t *xmap_ctx = &dms_ctx->xmap_ctx;

    DMS_INIT_MESSAGE_HEAD(&req.head, MSG_REQ_OPENGAUSS_TXN_SWINFO, 0, dms_ctx->inst_id,
        xmap_ctx->dest_id, dms_ctx->sess_id, CM_INVALID_ID16);
    req.proc_slot = dms_txn_swinfo->server_proc_slot;
    req.head.size = (uint16)sizeof(msg_opengauss_txn_swinfo_t);

    dms_begin_stat(dms_ctx->sess_id, DMS_EVT_TXN_REQ_INFO, CM_TRUE);

    int32 ret = mfc_send_data(&req.head);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);
        LOG_DEBUG_ERR("[TXN][request openGauss txn swinfo failed] src_inst %u src_sid %u dst_inst %u",
            dms_ctx->inst_id, dms_ctx->sess_id, xmap_ctx->dest_id);
        return ret;
    }

    ret = mfc_get_response(req.head.ruid, &dms_msg, DMS_WAIT_MAX_TIME);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);
        LOG_DEBUG_ERR("[TXN] receive message to instance(%u) failed, cmd(%u) ruid(%llu) errcode(%d)", xmap_ctx->dest_id,
            (uint32)MSG_REQ_OPENGAUSS_TXN_SWINFO, req.head.ruid, ret);
        return ret;
    }

    dms_end_stat(dms_ctx->sess_id);

    if (dms_msg.head->cmd == MSG_ACK_OPENGAUSS_TXN_SWINFO) {
        uint32 total_size = (uint32)(sizeof(dms_message_head_t) + sizeof(dms_opengauss_txn_sw_info_t));
        CM_CHK_RESPONSE_SIZE(&dms_msg, total_size, CM_FALSE);
        dms_opengauss_txn_sw_info_t received_swinfo =
            *(dms_opengauss_txn_sw_info_t *)(dms_msg.buffer + sizeof(dms_message_head_t));
        dms_txn_swinfo->sxid = received_swinfo.sxid;
        dms_txn_swinfo->scid = received_swinfo.scid;
        mfc_release_response(&dms_msg);
        return DMS_SUCCESS;
    } else {
        mfc_release_response(&dms_msg);
        DMS_THROW_ERROR(ERRNO_DMS_DCS_GET_TXN_INFO_FAILED);
        return ERRNO_DMS_DCS_GET_TXN_INFO_FAILED;
    }
}

int dms_request_txn_snapshot(dms_context_t *dms_ctx, dms_txn_snapshot_t *dms_txn_snapshot)
{
    dms_reset_error();
    dms_message_t message;
    msg_txn_snapshot_t req;
    dms_xmap_ctx_t *xmap_ctx = &dms_ctx->xmap_ctx;

    DMS_INIT_MESSAGE_HEAD(&req.head, MSG_REQ_TXN_SNAPSHOT, 0, dms_ctx->inst_id,
        xmap_ctx->dest_id, dms_ctx->sess_id, CM_INVALID_ID16);
    req.head.size = (uint16)sizeof(msg_txn_snapshot_t);
    req.xmap = xmap_ctx->xmap;

    dms_begin_stat(dms_ctx->sess_id, DMS_EVT_TXN_REQ_SNAPSHOT, CM_TRUE);
    DDES_FAULT_INJECTION_CALL(DMS_FI_REQ_TXN_SNAPSHOT, MSG_REQ_TXN_SNAPSHOT);

    int32 ret = mfc_send_data(&req.head);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);

        LOG_DEBUG_ERR("[TXN][request txn snapshot failed] src_inst %u src_sid %u dst_inst %u",
            dms_ctx->inst_id, dms_ctx->sess_id, xmap_ctx->dest_id);
        DMS_THROW_ERROR(ERRNO_DMS_SEND_MSG_FAILED, ret, MSG_REQ_TXN_SNAPSHOT, xmap_ctx->dest_id);
        return ERRNO_DMS_SEND_MSG_FAILED;
    }

    ret = mfc_get_response(req.head.ruid, &message, DMS_WAIT_MAX_TIME);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);
        LOG_DEBUG_ERR("[TXN] receive message to instance(%u) failed, cmd(%u) ruid(%llu) errcode(%d)", xmap_ctx->dest_id,
            (uint32)MSG_REQ_TXN_SNAPSHOT, req.head.ruid, ret);
        DMS_RETURN_IF_PROTOCOL_COMPATIBILITY_ERROR(ret);
        DMS_THROW_ERROR(ERRNO_DMS_COMMON_CBB_FAILED, ret);
        return ERRNO_DMS_COMMON_CBB_FAILED;
    }

    dms_end_stat(dms_ctx->sess_id);

    dms_message_head_t *ack_dms_head = get_dms_head(&message);
    if (ack_dms_head->cmd == MSG_ACK_TXN_SNAPSHOT) {
        CM_CHK_RESPONSE_SIZE((dms_message_t *)&message.buffer,
            (uint32)(sizeof(dms_message_head_t) + sizeof(dms_txn_snapshot_t)), CM_FALSE);
        *dms_txn_snapshot = *(dms_txn_snapshot_t *)(message.buffer + sizeof(dms_message_head_t));
        mfc_release_response(&message);
        if (dms_txn_snapshot->status == DMS_XACT_END) {
            g_dms.callback.update_global_scn(dms_ctx->db_handle, dms_txn_snapshot->scn);
        }
        return DMS_SUCCESS;
    } else {
        mfc_release_response(&message);
        DMS_THROW_ERROR(ERRNO_DMS_DCS_GET_TXN_SNAPSHOT_FAILED);
        return ERRNO_DMS_DCS_GET_TXN_SNAPSHOT_FAILED;
    }
}

void dcs_proc_txn_wait_req(dms_process_context_t *process_ctx, dms_message_t *receive_msg)
{
#ifdef OPENGAUSS
    /* pass */
#else
    msg_txn_wait_ack_t txn_wait_ack;

    CM_CHK_PROC_MSG_SIZE_NO_ERR(receive_msg, (uint32)sizeof(msg_txn_wait_request_t), CM_TRUE);
    msg_txn_wait_request_t *txn_wait_req = (msg_txn_wait_request_t *)(receive_msg->buffer);
    uint64 xid = txn_wait_req->xid;
    uint64 scn = 0;
    dms_txn_info_t txn_info;
    dms_message_head_t *req_head = receive_msg->head;
    int ret = g_dms.callback.get_txn_info(process_ctx->db_handle, xid, CM_FALSE, &txn_info);
    if (ret != DMS_SUCCESS) {
        cm_send_error_msg(req_head, ERRNO_DMS_CALLBACK_GET_TXN_INFO, "get txn info failed");
        return;
    }

    if (txn_info.status == DMS_XACT_END) {
        ret = DMS_REMOTE_TXN_END;
        scn = txn_info.scn;
    } else {
        if (drc_enqueue_txn(process_ctx->db_handle, &xid, req_head->src_inst, &txn_info) != DMS_SUCCESS) {
            cm_send_error_msg(req_head, ERRNO_DMS_CALLBACK_GET_TXN_INFO, "get txn info failed");
            return;
        }
        if (txn_info.status == DMS_XACT_END) {
            ret = DMS_REMOTE_TXN_END;
            scn = txn_info.scn;
        } else {
            ret = DMS_REMOTE_TXN_WAIT;
        }
    }

    dms_init_ack_head2(&txn_wait_ack.head, MSG_ACK_AWAKE_TXN, 0, req_head->dst_inst,
        req_head->src_inst, (uint16)process_ctx->sess_id, req_head->src_sid,
        req_head->msg_proto_ver);
    txn_wait_ack.head.size = (uint16)sizeof(msg_txn_wait_ack_t);
    txn_wait_ack.head.ruid = req_head->ruid;
    txn_wait_ack.status = ret;
    txn_wait_ack.scn = scn;

    DDES_FAULT_INJECTION_CALL(DMS_FI_ACK_AWAKE_TXN, MSG_ACK_AWAKE_TXN);
    if (mfc_send_data(&txn_wait_ack.head) != CM_SUCCESS) {
        LOG_DEBUG_ERR("[TXN] send txn info ack message failed, src_inst = %u, dst_inst = %u",
            (uint32)txn_wait_ack.head.src_inst, (uint32)txn_wait_ack.head.dst_inst);
    }
#endif
}

void dcs_proc_txn_awake_req(dms_process_context_t *process_ctx, dms_message_t *receive_msg)
{
#ifndef OPENGAUSS
    CM_CHK_PROC_MSG_SIZE_NO_ERR(receive_msg, (uint32)sizeof(msg_txn_awake_request_t), CM_FALSE);
    // machines are the same endian, if they are different, we need to adapt it.
    msg_txn_awake_request_t *txn_awake_req = (msg_txn_awake_request_t *)(receive_msg->buffer);
    uint64 xid = txn_awake_req->xid;
    uint64 scn = txn_awake_req->scn;

    g_dms.callback.update_global_scn(process_ctx->db_handle, scn);
    drc_local_txn_awake(&xid);
#endif
    // there is no ack msg.
}

static int32 dms_send_awake_txn_msg(dms_context_t *dms_ctx, uint32 dest_id)
{
    msg_txn_awake_request_t txn_awake_req;
    dms_message_head_t *head = &txn_awake_req.head;
    dms_xid_ctx_t *xid_ctx = &dms_ctx->xid_ctx;

    DMS_INIT_MESSAGE_HEAD(head, MSG_REQ_AWAKE_TXN, 0, dms_ctx->inst_id, dest_id,
        (uint16)dms_ctx->sess_id, CM_INVALID_ID16);
    txn_awake_req.xid = xid_ctx->xid;
    txn_awake_req.scn = xid_ctx->scn;
    head->size = (uint16)sizeof(msg_txn_awake_request_t);

    DDES_FAULT_INJECTION_CALL(DMS_FI_REQ_AWAKE_TXN, MSG_REQ_AWAKE_TXN);
    int32 ret = mfc_send_data_async(head);
    if (ret != CM_SUCCESS) {
        LOG_DEBUG_ERR("[TXN] send message to instance(%u) failed, cmd(%u) ruid(%llu) errcode(%d)",
            (uint32)xid_ctx->inst_id, (uint32)MSG_REQ_AWAKE_TXN, head->ruid, ret);
        DMS_THROW_ERROR(ERRNO_DMS_SEND_MSG_FAILED, ret, head->cmd, head->dst_inst);
        return ERRNO_DMS_SEND_MSG_FAILED;
    }
    return DMS_SUCCESS;
}

void dms_release_txn_cond(dms_context_t *dms_ctx)
{
    drc_res_ctx_t *ctx = DRC_RES_CTX;
    drc_res_map_t *res_map = &ctx->txn_res_map;
    drc_res_bucket_t *bucket = NULL;
    drc_txn_res_t *txn_res = NULL;
    uint64 xid = dms_ctx->xid_ctx.xid;

    bucket = drc_res_map_get_bucket(res_map, (char *)&xid, sizeof(uint64));
    cm_spin_lock(&bucket->lock, NULL);
    txn_res = (drc_txn_res_t *)drc_res_map_lookup(res_map, bucket, (char *)&xid, sizeof(uint64));
    if (txn_res == NULL) {
        cm_spin_unlock(&bucket->lock);
        return;
    }

    for (uint8 i = 0; i < g_dms.inst_cnt; i++) {
        if (bitmap64_exist(&txn_res->inst_map, i)) {
            if (i == g_dms.inst_id) {
                continue; // self instance
            }
            (void)dms_send_awake_txn_msg(dms_ctx, i);
        }
    }

    drc_res_map_del_res(res_map, bucket, (char *)&xid, sizeof(uint64));
    drc_release_txn_res(txn_res);
    cm_spin_unlock(&bucket->lock);
}

void dms_recycle_txn_cond(dms_context_t *dms_ctx)
{
    uint64 *xid = &dms_ctx->xid_ctx.xid;
    drc_local_txn_recycle(xid);
}

int dms_request_txn_cond_status(dms_context_t *dms_ctx, int *status)
{
    dms_reset_error();
    msg_txn_wait_request_t txn_wait_req;
    dms_message_head_t *head = &txn_wait_req.head;
    dms_xid_ctx_t *xid_ctx = &dms_ctx->xid_ctx;
    dms_message_t receive_msg = { 0 };

    DMS_INIT_MESSAGE_HEAD(head, MSG_REQ_WAIT_TXN, 0, dms_ctx->inst_id, xid_ctx->inst_id,
        (uint16)dms_ctx->sess_id, CM_INVALID_ID16);
    txn_wait_req.xid = xid_ctx->xid;
    head->size = (uint16)sizeof(txn_wait_req);

    DDES_FAULT_INJECTION_CALL(DMS_FI_REQ_WAIT_TXN, MSG_REQ_WAIT_TXN);
    int32 ret = mfc_send_data(head);
    if (ret != CM_SUCCESS) {
        LOG_DEBUG_ERR("[TXN] send message to instance(%u) failed, cmd(%u) ruid(%llu) errcode(%d)",
            (uint32)xid_ctx->inst_id, (uint32)MSG_REQ_WAIT_TXN, head->ruid, ret);
        DMS_THROW_ERROR(ERRNO_DMS_SEND_MSG_FAILED, ret, MSG_REQ_WAIT_TXN, xid_ctx->inst_id);
        return ERRNO_DMS_SEND_MSG_FAILED;
    }

    ret = mfc_get_response(head->ruid, &receive_msg, DMS_WAIT_MAX_TIME);
    if (ret != CM_SUCCESS) {
        LOG_DEBUG_ERR("[TXN] receive message to instance(%u) failed, cmd(%u) ruid(%llu) errcode(%d)",
            (uint32)xid_ctx->inst_id, (uint32)MSG_REQ_TXN_INFO, head->ruid, ret);
        DMS_RETURN_IF_PROTOCOL_COMPATIBILITY_ERROR(ret);
        DMS_THROW_ERROR(ERRNO_DMS_RECV_MSG_FAILED, ret, MSG_REQ_WAIT_TXN, xid_ctx->inst_id);
        return ERRNO_DMS_RECV_MSG_FAILED;
    }

    CM_CHK_RESPONSE_SIZE(&receive_msg, (uint32)sizeof(msg_txn_wait_ack_t), CM_FALSE);
    msg_txn_wait_ack_t *ack = (msg_txn_wait_ack_t *)(receive_msg.buffer);
    *status = ack->status;
    if (*status == DMS_REMOTE_TXN_END) {
        g_dms.callback.update_global_scn(dms_ctx->db_handle, ack->scn);
    }

    mfc_release_response(&receive_msg);
    return DMS_SUCCESS;
}

unsigned char dms_wait_txn_cond(dms_context_t *dms_ctx)
{
    dms_reset_error();
    uint64 *xid = &dms_ctx->xid_ctx.xid;
    return drc_local_txn_wait(xid);
}

int dms_request_imcstore_delta(dms_context_t *dms_ctx, unsigned int tableid, unsigned int rowgroup,
                               unsigned char* bitmap, unsigned long long* delta_max)
{
    dms_reset_error();
    dms_message_t dms_msg = { 0 };
    dms_imcstore_delta_req_t req;
    dms_xmap_ctx_t *xmap_ctx = &dms_ctx->xmap_ctx;

    DMS_INIT_MESSAGE_HEAD(&req.head, MSG_REQ_IMCSTORE_DELTA, 0, dms_ctx->inst_id, xmap_ctx->dest_id,
                          (uint16)dms_ctx->sess_id, CM_INVALID_ID16);
    req.tableid = tableid;
    req.rowgroup = rowgroup;
    req.head.size = (uint16)sizeof(dms_imcstore_delta_req_t);

    dms_begin_stat(dms_ctx->sess_id, DMS_EVT_REQ_IMCSTORE_DELTA, CM_TRUE);

    unsigned long long maxsize = IMCCTORE_DELTA_BITMAP_SIZE;
    for (int i = 0; i < maxsize;) {
        req.begin = i;
        int32 ret = mfc_send_data(&req.head);
        if (ret != CM_SUCCESS) {
            dms_end_stat(dms_ctx->sess_id);
            LOG_DEBUG_ERR("[IMCStore] send req message to instance(%u) failed, table(%u), rowgroup(%u)"
                          "ruid(%llu) errcode(%u)",
                          (uint32)dms_ctx->inst_id, tableid, rowgroup, req.head.ruid, (uint32)ret);
            return ret;
        }
        ret = mfc_get_response(req.head.ruid, &dms_msg, DMS_WAIT_MAX_TIME);
        if (ret != CM_SUCCESS) {
            dms_end_stat(dms_ctx->sess_id);
            LOG_DEBUG_ERR("[IMCStore] get response from instance(%u) failed, table(%u), rowgroup(%u)"
                          "ruid(%llu) errcode(%u)",
                          (uint32)dms_ctx->inst_id, tableid, rowgroup, req.head.ruid, (uint32)ret);
            return ret;
        }
        dms_imcstore_delta_ack_t *delta_ack = (dms_imcstore_delta_ack_t*)(dms_msg.buffer);

        errno_t err = memcpy_s(bitmap + req.begin, IMCCTORE_DELTA_BITMAP_SIZE - req.begin,
                               &delta_ack->bitmap, delta_ack->size);
        if (err != EOK) {
            LOG_DEBUG_ERR("[IMCStore] copy bitmap failed");
        }
        maxsize = delta_ack->max_size;
        i += delta_ack->size;
        mfc_release_response(&dms_msg);
    }
    dms_end_stat(dms_ctx->sess_id);
    *delta_max = maxsize;
    return DMS_SUCCESS;
}

void dms_proc_imcstore_delta(dms_process_context_t *proc_ctx, dms_message_t *receive_msg)
{
    CM_CHK_PROC_MSG_SIZE_NO_ERR(receive_msg, (uint32)sizeof(dms_imcstore_delta_req_t), CM_TRUE);
    dms_imcstore_delta_req_t req = *(dms_imcstore_delta_req_t *)receive_msg->buffer;

    unsigned char bitmap[IMCCTORE_DELTA_BITMAP_SIZE];
    dms_imcstore_delta_ack_t ack = { 0 };
    g_dms.callback.get_imcstore_delta(req.tableid, req.rowgroup, bitmap, &ack.max_size);

    dms_init_ack_head(&req.head, &ack.head, MSG_ACK_IMCSTORE_DELTA,
                      sizeof(dms_imcstore_delta_ack_t), proc_ctx->sess_id);

    unsigned int total = IMCCTORE_DELTA_BITMAP_SIZE;
    ack.size = (total - req.begin) > IMCSTORE_DELTA_PER_MESSAGE ? IMCSTORE_DELTA_PER_MESSAGE : (total - req.begin);
    errno_t err = memcpy_s(ack.bitmap, IMCSTORE_DELTA_PER_MESSAGE, bitmap + req.begin, ack.size);
    if (err != EOK) {
        LOG_DEBUG_ERR("[IMCStore] copy bitmap failed");
    }

    int32 ret = mfc_send_data(&ack.head);
    if (ret == DMS_SUCCESS) {
        LOG_DEBUG_INF("[DMS][dms_proc_imcstore_delta]: finished, dst_id = %u, dst_sid = %u",
                      (uint8)ack.head.dst_inst, (uint16)ack.head.dst_sid);
    } else {
        LOG_DEBUG_ERR("[DMS][dms_proc_imcstore_delta]: failed to send ack, dst_id = %u, dst_sid = %u, errcode = %u",
                      (uint8)ack.head.dst_inst, (uint16)ack.head.dst_sid, ret);
    }
}

int dms_request_opengauss_page_status(dms_context_t *dms_ctx, unsigned int page, int page_num,
    unsigned long int *page_map, int *bit_count)
{
    dms_reset_error();
    msg_opengauss_page_status_request_t status_req;
    dms_message_head_t *head = &status_req.head;
    dms_rfn_t *node = &dms_ctx->rfn;
    dms_message_t receive_msg = { 0 };

    DMS_INIT_MESSAGE_HEAD(head, MSG_REQ_OPENGAUSS_PAGE_STATUS, 0, dms_ctx->inst_id, node->inst_id,
        (uint16)dms_ctx->sess_id, CM_INVALID_ID16);
    status_req.rnode = node->rnode;
    status_req.page = page;
    status_req.page_num = page_num;
    status_req.bit_count = *bit_count;
    errno_t err = memcpy_s(status_req.page_map, sizeof(status_req.page_map), page_map, sizeof(status_req.page_map));
    if (err != EOK) {
        LOG_DEBUG_ERR("[PAGE] memcpy_s failed, errno = %d", err);
        DMS_THROW_ERROR(ERRNO_DMS_SECUREC_CHECK_FAIL);
        return ERRNO_DMS_SECUREC_CHECK_FAIL;
    }

    head->size = (uint16)sizeof(msg_opengauss_page_status_request_t);

    dms_begin_stat(dms_ctx->sess_id, DMS_EVT_PAGE_STATUS_INFO, CM_TRUE);

    int32 ret = mfc_send_data(head);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);
        LOG_DEBUG_ERR("[PAGE] send message to instance(%u) failed, cmd(%u) ruid(%llu) errcode(%u)",
            (uint32)node->inst_id, (uint32)MSG_REQ_OPENGAUSS_PAGE_STATUS, head->ruid, (uint32)ret);
        return ret;
    }

    ret = mfc_get_response(head->ruid, &receive_msg, DMS_WAIT_MAX_TIME);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);
        LOG_DEBUG_ERR("[PAGE] receive message to instance(%u) failed, cmd(%u) ruid(%llu) errcode(%u)",
            (uint32)node->inst_id, (uint32)MSG_REQ_OPENGAUSS_PAGE_STATUS, head->ruid, (uint32)ret);
        return ret;
    }

    dms_end_stat(dms_ctx->sess_id);

    CM_CHK_RESPONSE_SIZE(&receive_msg,
        (uint32)(sizeof(dms_message_head_t) + sizeof(dms_opengauss_page_status_result_t)), CM_FALSE);
    dms_opengauss_page_status_result_t status_result;
    err = memcpy_s(&status_result, sizeof(dms_opengauss_page_status_result_t),
        (receive_msg.buffer + sizeof(dms_message_head_t)), sizeof(dms_opengauss_page_status_result_t));
    if (err != EOK) {
        mfc_release_response(&receive_msg);
        LOG_DEBUG_ERR("[PAGE] memcpy_s failed, errno = %d", err);
        DMS_THROW_ERROR(ERRNO_DMS_SECUREC_CHECK_FAIL);
        return ERRNO_DMS_SECUREC_CHECK_FAIL;
    }
    *bit_count = status_result.bit_count;
    err = memcpy_s(page_map, sizeof(status_result.page_map), status_result.page_map, sizeof(status_result.page_map));
    if (err != EOK) {
        mfc_release_response(&receive_msg);
        LOG_DEBUG_ERR("[PAGE] memcpy_s failed, errno = %d", err);
        DMS_THROW_ERROR(ERRNO_DMS_SECUREC_CHECK_FAIL);
        return ERRNO_DMS_SECUREC_CHECK_FAIL;
    }
    mfc_release_response(&receive_msg);
    return DMS_SUCCESS;
}

void dcs_proc_opengauss_page_status_req(dms_process_context_t *process_ctx, dms_message_t *receive_msg)
{
    dms_message_head_t *req_head = receive_msg->head;
    dms_message_head_t ack_head;

    CM_CHK_PROC_MSG_SIZE_NO_ERR(receive_msg, (uint32)sizeof(msg_opengauss_page_status_request_t), CM_TRUE);
    msg_opengauss_page_status_request_t *status_req = (msg_opengauss_page_status_request_t *)(receive_msg->buffer);
    dms_opengauss_page_status_result_t page_result = { 0 };

    unsigned int page = status_req->page;
    dms_opengauss_relfilenode_t *rnode = &status_req->rnode;
    int page_num = status_req->page_num;
    page_result.bit_count = status_req->bit_count;
    errno_t err = memcpy_s(page_result.page_map, sizeof(page_result.page_map), status_req->page_map,
        sizeof(page_result.page_map));
    if (err != EOK) {
        DMS_THROW_ERROR(ERRNO_DMS_SECUREC_CHECK_FAIL);
        cm_ack_result_msg(process_ctx, receive_msg, MSG_ACK_ERROR, DMS_ERROR);
        return;
    }

    int ret = g_dms.callback.get_opengauss_page_status(process_ctx->db_handle, rnode, page, page_num, &page_result);
    if (ret != DMS_SUCCESS) {
        DMS_THROW_ERROR(ERRNO_DMS_DCS_GET_PAGE_IN_BUFFER_FAILED, ret);
        cm_ack_result_msg(process_ctx, receive_msg, MSG_ACK_ERROR, ret);
        return;
    }

    dms_init_ack_head2(&ack_head, MSG_ACK_OPENGAUSS_PAGE_STATUS, 0, req_head->dst_inst, req_head->src_inst,
        (uint16)process_ctx->sess_id, req_head->src_sid, req_head->msg_proto_ver);
    ack_head.size = (uint16)(sizeof(dms_opengauss_page_status_result_t) + sizeof(dms_message_head_t));
    ack_head.ruid = req_head->ruid;

    if (mfc_send_data3(&ack_head, sizeof(dms_message_head_t), &page_result) != CM_SUCCESS) {
        LOG_DEBUG_ERR("[PAGE] send openGauss page status ack message failed, src_inst = %u, dst_inst = %u",
            (uint32)ack_head.src_inst, (uint32)ack_head.dst_inst);
    }
}

int dms_send_opengauss_oldest_xmin(dms_context_t *dms_ctx, uint64 oldest_xmin, unsigned char dest_id)
{
    msg_send_opengauss_oldest_xmin_t send_msg;
    DMS_INIT_MESSAGE_HEAD(&send_msg.head, MSG_REQ_SEND_OPENGAUSS_OLDEST_XMIN, 0, dms_ctx->inst_id,
        dest_id, dms_ctx->sess_id, CM_INVALID_ID16);
    send_msg.head.size = sizeof(msg_send_opengauss_oldest_xmin_t);
    send_msg.oldest_xmin = oldest_xmin;
    int ret = CM_SUCCESS;

    dms_begin_stat(dms_ctx->sess_id, DMS_EVT_OPENGAUSS_SEND_XMIN, CM_TRUE);
    ret = mfc_send_data(&send_msg.head);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);
        LOG_DEBUG_WAR("[OG XMIN] send openGauss oldest xmin failed, src_inst:%u, src_sid:%u, "
            "dst_inst:%u, ruid:%llu, oldest_xmin:%llu",
            send_msg.head.src_inst, send_msg.head.src_sid, send_msg.head.dst_inst, send_msg.head.ruid, oldest_xmin);
        return ret;
    }
    LOG_DEBUG_INF("[OG XMIN] send openGauss oldest xmin success, src_inst:%u, src_sid:%u, "
        "dst_inst:%u, ruid:%llu",
        send_msg.head.src_inst, send_msg.head.src_sid, send_msg.head.dst_inst, send_msg.head.ruid);

    dms_message_t ack_msg = { 0 };
    ret = mfc_get_response(send_msg.head.ruid, &ack_msg, DMS_WAIT_MAX_TIME);
    if (ret != CM_SUCCESS) {
        dms_end_stat(dms_ctx->sess_id);
        LOG_DEBUG_WAR("[OG XMIN] wait receive openGauss oldest xmin ack failed, src_inst:%u, src_sid:%u, "
            "dst_inst:%u, ruid:%llu",
            send_msg.head.src_inst, send_msg.head.src_sid, send_msg.head.dst_inst, send_msg.head.ruid);
        return ret;
    }
    dms_end_stat(dms_ctx->sess_id);
    LOG_DEBUG_INF("[OG XMIN] receive openGauss oldest xmin ack success, src_inst:%u, src_sid:%u, "
        "dst_inst:%u, ruid:%llu",
        send_msg.head.src_inst, send_msg.head.src_sid, send_msg.head.dst_inst, send_msg.head.ruid);
    mfc_release_response(&ack_msg);
    return ret;
}

void dcs_proc_send_opengauss_oldest_xmin(dms_process_context_t *process_ctx, dms_message_t *receive_msg)
{
    CM_CHK_PROC_MSG_SIZE_NO_ERR(receive_msg, (uint32)sizeof(msg_send_opengauss_oldest_xmin_t), CM_TRUE);
    msg_send_opengauss_oldest_xmin_t recv_msg = *(msg_send_opengauss_oldest_xmin_t*)receive_msg->buffer;

    uint64 oldest_xmin = recv_msg.oldest_xmin;
    LOG_DEBUG_INF("[OG XMIN] receive openGauss oldest xmin, src_inst:%u, src_sid:%u, dst_inst:%u, ruid:%llu, "
        "oldest_xmin:%llu",
        recv_msg.head.src_inst, recv_msg.head.src_sid, recv_msg.head.dst_inst, recv_msg.head.ruid, oldest_xmin);
    g_dms.callback.update_node_oldest_xmin(process_ctx->db_handle, recv_msg.head.src_inst, oldest_xmin);

    dms_message_head_t ack_msg;
    dms_init_ack_head(&recv_msg.head, &ack_msg, MSG_ACK_SEND_OPENGAUSS_OLDEST_XMIN, sizeof(dms_message_head_t),
        process_ctx->sess_id);
    int ret = mfc_send_data(&ack_msg);
    if (ret != CM_SUCCESS) {
        LOG_DEBUG_WAR("[OG XMIN] send openGauss oldest xmin ack failed, src_inst:%u,"
            "src_sid:%u, dst_inst:%u, ruid:%llu",
            ack_msg.src_inst, ack_msg.src_sid, ack_msg.dst_inst, ack_msg.ruid);
        return;
    }
    LOG_DEBUG_INF("[OG XMIN] send openGauss oldest xmin ack success, src_inst:%u, src_sid:%u, dst_inst:%u, ruid:%llu",
        ack_msg.src_inst, ack_msg.src_sid, ack_msg.dst_inst, ack_msg.ruid);
}

static bool8 dms_proc_check_xid_valid(drc_global_xid_t *xid)
{
    if (xid->bqual_len > DMS_MAX_XA_BASE16_BQUAL_LEN || xid->gtrid_len > DMS_MAX_XA_BASE16_GTRID_LEN) {
        LOG_RUN_ERR("[DMS]: invalid global xa xid");
        return CM_FALSE;
    }

    text_t gtrid, bqual;
    cm_str2text_safe(xid->gtrid, xid->gtrid_len, &gtrid);
    cm_str2text_safe(xid->bqual, xid->bqual_len, &bqual);
    if (cm_chk_and_upper_base16(&gtrid) != CM_SUCCESS) {
        LOG_RUN_ERR("[DMS][%s]: invalid global transaction ID", cm_display_resid((char *)xid, DRC_RES_GLOBAL_XA_TYPE));
        return CM_FALSE;
    }

    if (cm_chk_and_upper_base16(&bqual) != CM_SUCCESS) {
        LOG_RUN_ERR("[DMS][%s]: invalid global transaction branch ID", cm_display_resid((char *)xid,
            DRC_RES_GLOBAL_XA_TYPE));
        return CM_FALSE;
    }

    return CM_TRUE;
}

int32 dms_request_create_xa_res(dms_context_t *dms_ctx, uint8 master_id, uint8 undo_set_id, uint32 *result_code)
{
    dms_xa_res_req_t req;

    dms_begin_stat(dms_ctx->sess_id, DMS_EVT_DCS_REQ_CREATE_XA_RES, CM_TRUE);
    DMS_INIT_MESSAGE_HEAD(&req.head, MSG_REQ_CREATE_GLOBAL_XA_RES, 0, dms_ctx->inst_id, master_id, dms_ctx->sess_id,
        CM_INVALID_ID16);
    req.head.size = (uint16)sizeof(dms_xa_res_req_t);
    req.oper_type = DMS_XA_OPER_CREATE;
    req.undo_set_id = undo_set_id;
    errno_t ret = memcpy_sp(&req.xa_xid, sizeof(drc_global_xid_t), &dms_ctx->global_xid, sizeof(drc_global_xid_t));
    if (ret != EOK) {
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_CREATE_XA_RES);
        DMS_THROW_ERROR(ERR_SYSTEM_CALL, ret);
        return ret;
    }

    LOG_DEBUG_INF("[DMS][%s][dms_request_create_xa_res]: src_id = %u, dst_id = %u",
        cm_display_resid((char *)&dms_ctx->global_xid, DRC_RES_GLOBAL_XA_TYPE), dms_ctx->inst_id, (uint32)master_id);
    DDES_FAULT_INJECTION_CALL(DMS_FI_REQ_CREATE_GLOBAL_XA_RES, MSG_REQ_CREATE_GLOBAL_XA_RES);
    int32 ret_code = mfc_send_data(&req.head);
    if (ret_code != DMS_SUCCESS) {
        LOG_DEBUG_ERR("[DMS][%s][dms_request_create_xa_res]: failed to send create xa res request",
            cm_display_resid((char *)&dms_ctx->global_xid, DRC_RES_GLOBAL_XA_TYPE));
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_CREATE_XA_RES);
        DMS_THROW_ERROR(ERRNO_DMS_SEND_MSG_FAILED, ret, MSG_REQ_CREATE_GLOBAL_XA_RES, master_id);
        return ERRNO_DMS_SEND_MSG_FAILED;
    }

    dms_message_t recv_msg = { 0 };
    ret_code = mfc_get_response(req.head.ruid, &recv_msg, DMS_WAIT_MAX_TIME);
    if (ret_code != DMS_SUCCESS) {
        LOG_DEBUG_ERR("[DMS][%s][dms_request_create_xa_res]: wait master node ack timeout, timeout = %u ms",
            cm_display_resid((char *)&dms_ctx->global_xid, DRC_RES_GLOBAL_XA_TYPE), DMS_WAIT_MAX_TIME);
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_CREATE_XA_RES);
        DMS_THROW_ERROR(ERRNO_DMS_RECV_MSG_FAILED, ret, MSG_REQ_CREATE_GLOBAL_XA_RES, master_id);
        return ERRNO_DMS_RECV_MSG_FAILED;
    }

    if (recv_msg.head->cmd == MSG_ACK_ERROR) {
        cm_print_error_msg_and_throw_error(recv_msg.buffer);
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_CREATE_XA_RES);
        mfc_release_response(&recv_msg);
        return ERRNO_DMS_COMMON_MSG_ACK;
    }

    LOG_DEBUG_INF("[DMS][%s]: src_id=%u, src_sid=%u, dst_id=%u, dst_sid=%u, flag=%u, ruid=%llu",
        cm_display_resid((char *)&dms_ctx->global_xid, DRC_RES_GLOBAL_XA_TYPE), (uint32)recv_msg.head->src_inst,
        (uint32)recv_msg.head->src_sid, (uint32)recv_msg.head->dst_inst,
        (uint32)recv_msg.head->dst_sid, (uint32)recv_msg.head->flags, recv_msg.head->ruid);

    dms_xa_res_ack_t *result_msg = (dms_xa_res_ack_t *)recv_msg.buffer;
    *result_code = result_msg->return_code;
    dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_CREATE_XA_RES);
    mfc_release_response(&recv_msg);
    LOG_DEBUG_INF("[DMS][%s][dms_request_create_xa_res]: create xa res remote success, master_id = %u",
        cm_display_resid((char *)&dms_ctx->global_xid, DRC_RES_GLOBAL_XA_TYPE), (uint32)master_id);
    return DMS_SUCCESS;
}

void dms_proc_create_xa_res(dms_process_context_t *proc_ctx, dms_message_t *receive_msg)
{
    CM_CHK_PROC_MSG_SIZE_NO_ERR(receive_msg, (uint32)sizeof(dms_xa_res_req_t), CM_TRUE);
    dms_xa_res_req_t req = *(dms_xa_res_req_t *)receive_msg->buffer;

    drc_global_xid_t *xid = &req.xa_xid;
    if (!dms_proc_check_xid_valid(xid)) {
        cm_send_error_msg(receive_msg->head, ERRNO_DMS_DRC_XA_RES_NOT_EXISTS, "invalid xid for xa creation");
        return;
    }

    int32 ret = drc_create_xa_res(proc_ctx->db_handle, proc_ctx->sess_id, xid, req.head.src_inst, req.undo_set_id,
        CM_TRUE);
    dms_xa_res_ack_t ack = { 0 };
    dms_init_ack_head(&req.head, &ack.head, MSG_ACK_CREATE_GLOBAL_XA_RES, sizeof(dms_xa_res_ack_t),
        proc_ctx->sess_id);
    ack.return_code = (uint32)ret;

    DDES_FAULT_INJECTION_CALL(DMS_FI_ACK_CREATE_GLOBAL_XA_RES, MSG_ACK_CREATE_GLOBAL_XA_RES);
    ret = mfc_send_data(&ack.head);
    if (ret != DMS_SUCCESS) {
        LOG_DEBUG_ERR("[DMS][%s][dms_proc_create_xa_res]: failed to send ack, dst_id = %u, dst_sid = %u, errcode = %d",
            cm_display_resid((char *)xid, DRC_RES_GLOBAL_XA_TYPE), (uint8)ack.head.dst_inst, (uint16)ack.head.dst_sid, 
            ret);
    } else {
        LOG_DEBUG_INF("[DMS][%s][dms_proc_create_xa_res]: finished, dst_id = %u, dst_sid = %u",
            cm_display_resid((char *)xid, DRC_RES_GLOBAL_XA_TYPE), (uint8)ack.head.dst_inst, (uint16)ack.head.dst_sid);
    }
}

int32 dms_request_delete_xa_res(dms_context_t *dms_ctx, uint8 master_id, uint32 *result_code)
{
    dms_xa_res_req_t req;

    dms_begin_stat(dms_ctx->sess_id, DMS_EVT_DCS_REQ_DELETE_XA_RES, CM_TRUE);
    DMS_INIT_MESSAGE_HEAD(&req.head, MSG_REQ_DELETE_GLOBAL_XA_RES, 0, dms_ctx->inst_id, master_id, dms_ctx->sess_id,
        CM_INVALID_ID16);

    req.head.size = (uint16)sizeof(dms_xa_res_req_t);
    req.oper_type = DMS_XA_OPER_DELETE;
    errno_t ret = memcpy_sp(&req.xa_xid, sizeof(drc_global_xid_t), &dms_ctx->global_xid, sizeof(drc_global_xid_t));
    if (ret != EOK) {
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_DELETE_XA_RES);
        DMS_THROW_ERROR(ERR_SYSTEM_CALL, ret);
        return ret;
    }

    LOG_DEBUG_INF("[DMS][%s][dms_request_delete_xa_res]: src_id = %u, dst_id = %u",
        cm_display_resid((char *)&dms_ctx->global_xid, DRC_RES_GLOBAL_XA_TYPE), dms_ctx->inst_id, (uint32)master_id);
    DDES_FAULT_INJECTION_CALL(DMS_FI_REQ_DELETE_GLOBAL_XA_RES, MSG_REQ_DELETE_GLOBAL_XA_RES);
    int32 ret_code = mfc_send_data(&req.head);
    if (ret_code != DMS_SUCCESS) {
        LOG_DEBUG_ERR("[DMS][%s][dms_request_delete_xa_res]: failed to send delete xa res request",
            cm_display_resid((char *)&dms_ctx->global_xid, DRC_RES_GLOBAL_XA_TYPE));
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_DELETE_XA_RES);
        DMS_THROW_ERROR(ERRNO_DMS_SEND_MSG_FAILED, ret_code, MSG_REQ_DELETE_GLOBAL_XA_RES, master_id);
        return ERRNO_DMS_SEND_MSG_FAILED;
    }

    dms_message_t recv_msg = { 0 };
    ret_code = mfc_get_response(req.head.ruid, &recv_msg, DMS_WAIT_MAX_TIME);
    if (ret_code != DMS_SUCCESS) {
        LOG_DEBUG_ERR("[DMS][%s][dms_request_delete_xa_res]: wait master node ack timeout, timeout = %u ms",
            cm_display_resid((char *)&dms_ctx->global_xid, DRC_RES_GLOBAL_XA_TYPE), DMS_WAIT_MAX_TIME);
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_DELETE_XA_RES);
        DMS_THROW_ERROR(ERRNO_DMS_RECV_MSG_FAILED, ret_code, MSG_REQ_DELETE_GLOBAL_XA_RES, master_id);
        return ERRNO_DMS_RECV_MSG_FAILED;
    }

    if (recv_msg.head->cmd == MSG_ACK_ERROR) {
        cm_print_error_msg_and_throw_error(recv_msg.buffer);
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_DELETE_XA_RES);
        mfc_release_response(&recv_msg);
        return ERRNO_DMS_COMMON_MSG_ACK;
    }

    LOG_DEBUG_INF("[DMS][%s]: src_id = %u, src_sid = %u, dst_id = %u, dst_sid = %u, flag = %u, ruid = %llu",
        cm_display_resid((char *)&dms_ctx->global_xid, DRC_RES_GLOBAL_XA_TYPE), (uint32)recv_msg.head->src_inst,
        (uint32)recv_msg.head->src_sid, (uint32)recv_msg.head->dst_inst,
        (uint32)recv_msg.head->dst_sid, (uint32)recv_msg.head->flags, recv_msg.head->ruid);

    dms_xa_res_ack_t *result_msg = (dms_xa_res_ack_t *)recv_msg.buffer;
    *result_code = result_msg->return_code;
    dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_DELETE_XA_RES);
    mfc_release_response(&recv_msg);
    LOG_DEBUG_INF("[DMS][%s][dms_request_delete_xa_res]: delete xa res remote success, master_id = %u",
        cm_display_resid((char *)&dms_ctx->global_xid, DRC_RES_GLOBAL_XA_TYPE), (uint32)master_id);
    return DMS_SUCCESS;
}

void dms_proc_delete_xa_res(dms_process_context_t *proc_ctx, dms_message_t *receive_msg)
{
    CM_CHK_PROC_MSG_SIZE_NO_ERR(receive_msg, (uint32)sizeof(dms_xa_res_req_t), CM_TRUE);
    dms_xa_res_req_t req = *(dms_xa_res_req_t *)receive_msg->buffer;

    drc_global_xid_t *xid = &req.xa_xid;
    if (!dms_proc_check_xid_valid(xid)) {
        cm_send_error_msg(receive_msg->head, ERRNO_DMS_DRC_XA_RES_NOT_EXISTS, "invalid xid for xa deletion");
        return;
    }

    int32 ret = drc_delete_xa_res(xid, CM_TRUE);
    dms_xa_res_ack_t ack = { 0 };
    dms_init_ack_head(&req.head, &ack.head, MSG_ACK_DELETE_GLOBAL_XA_RES, sizeof(dms_xa_res_ack_t),
        proc_ctx->sess_id);
    ack.return_code = (uint32)ret;

    DDES_FAULT_INJECTION_CALL(DMS_FI_ACK_DELETE_GLOBAL_XA_RES, MSG_ACK_DELETE_GLOBAL_XA_RES);
    ret = mfc_send_data(&ack.head);
    if (ret != DMS_SUCCESS) {
        LOG_DEBUG_ERR("[DMS][%s][dms_proc_delete_xa_res]: failed to send ack, dst_id = %u, dst_sid = %u, errcode = %d",
            cm_display_resid((char *)xid, DRC_RES_GLOBAL_XA_TYPE), (uint8)ack.head.dst_inst, (uint16)ack.head.dst_sid,
            ret);
    } else {
        LOG_DEBUG_INF("[DMS][%s][dms_proc_delete_xa_res]: send delete xa res ack finished, dst_id = %u, dst_sid = %u",
            cm_display_resid((char *)xid, DRC_RES_GLOBAL_XA_TYPE), (uint8)ack.head.dst_inst, (uint16)ack.head.dst_sid);
    }
}

void dms_proc_ask_xa_owner(dms_process_context_t *ctx, dms_message_t *receive_msg)
{
    CM_CHK_PROC_MSG_SIZE_NO_ERR(receive_msg, (uint32)sizeof(dms_ask_xa_owner_req_t), CM_TRUE);
    dms_ask_xa_owner_req_t req = *(dms_ask_xa_owner_req_t *)receive_msg->buffer;

    drc_global_xid_t *xid = &req.xa_xid;
    if (!dms_proc_check_xid_valid(xid)) {
        cm_send_error_msg(receive_msg->head, ERRNO_DMS_DRC_XA_RES_NOT_EXISTS, "invalid xid for req xa owner");
        return;
    }

    uint8 owner_id = CM_INVALID_ID8;
    drc_global_xa_res_t *xa_res = NULL;
    int32 ret = drc_enter_xa_res(xid, &xa_res, CM_TRUE);
    if (ret != DMS_SUCCESS) {
        dms_send_error_ack(ctx, req.head.src_inst, req.head.src_sid, req.head.ruid, ret, req.head.msg_proto_ver);
        return;
    }

    if (xa_res != NULL) {
        owner_id = xa_res->owner_id;
    }

    drc_global_res_map_t *xa_res_map = drc_get_global_res_map(DRC_RES_GLOBAL_XA_TYPE);
    drc_res_bucket_t *bucket = drc_res_map_get_bucket(&xa_res_map->res_map, (char *)xid, sizeof(drc_global_xid_t));
    drc_leave_xa_res(xa_res_map, bucket);

    dms_ask_xa_owner_ack_t ack = { 0 };
    dms_init_ack_head(&req.head, &ack.head, MSG_ACK_ASK_XA_OWNER_ID, sizeof(dms_ask_xa_owner_ack_t), ctx->sess_id);
    ack.owner_id = owner_id;

    LOG_DEBUG_INF("[DMS][%s][dms_proc_ask_xa_owner]: src_id = %u, src_sid = %u",
        cm_display_resid((char *)xid, DRC_RES_GLOBAL_XA_TYPE), (uint8)req.head.src_inst, (uint16)req.head.src_sid);

    DDES_FAULT_INJECTION_CALL(DMS_FI_ACK_ASK_XA_OWNER_ID, MSG_ACK_ASK_XA_OWNER_ID);
    ret = mfc_send_data(&ack.head);
    if (ret == DMS_SUCCESS) {
        LOG_DEBUG_INF("[DMS][%s][dms_proc_ask_xa_owner]: finished, dst_id = %u, dst_sid = %u",
            cm_display_resid((char *)xid, DRC_RES_GLOBAL_XA_TYPE), (uint8)ack.head.dst_inst, (uint16)ack.head.dst_sid);
    } else {
        LOG_DEBUG_ERR("[DMS][%s][dms_proc_ask_xa_owner]: failed to send ack, dst_id = %u, dst_sid = %u, errcode = %d",
            cm_display_resid((char *)xid, DRC_RES_GLOBAL_XA_TYPE), (uint8)ack.head.dst_inst, (uint16)ack.head.dst_sid,
            ret);
    }
}

static int32 dms_ask_xa_owner_local(dms_context_t *dms_ctx, uint8 *owner_id)
{
    drc_global_xa_res_t *xa_res = NULL;
    drc_global_xid_t *global_xid = &dms_ctx->global_xid;
    drc_global_res_map_t *res_map = drc_get_global_res_map(DRC_RES_GLOBAL_XA_TYPE);
    drc_res_bucket_t *bucket = drc_res_map_get_bucket(&res_map->res_map, (char *)global_xid, sizeof(drc_global_xid_t));

    int32 ret = drc_enter_xa_res(global_xid, &xa_res, CM_TRUE);
    if (ret != DMS_SUCCESS) {
        return ret;
    }

    if (xa_res == NULL || xa_res->owner_id == CM_INVALID_ID8) {
        LOG_DEBUG_ERR("[TXN][%s][dms_ask_xa_owner_local]: xa res not exists", cm_display_resid((char *)global_xid,
            DRC_RES_GLOBAL_XA_TYPE));
        drc_leave_xa_res(res_map, bucket);
        DMS_THROW_ERROR(ERRNO_DMS_DRC_XA_RES_NOT_EXISTS, cm_display_resid((char *)global_xid, DRC_RES_GLOBAL_XA_TYPE));
        return ERRNO_DMS_DRC_XA_RES_NOT_EXISTS;
    }

    *owner_id = xa_res->owner_id;
    drc_leave_xa_res(res_map, bucket);
    return DMS_SUCCESS;
}

static int32 dms_ask_xa_owner_remote(dms_context_t *dms_ctx, uint8 master_id, uint8 *owner_id)
{
    dms_ask_xa_owner_req_t req = { 0 };
    drc_global_xid_t *global_xid = &dms_ctx->global_xid;

    dms_begin_stat(dms_ctx->sess_id, DMS_EVT_DCS_REQ_XA_OWNER_ID, CM_TRUE);
    DMS_INIT_MESSAGE_HEAD(&req.head, MSG_REQ_ASK_XA_OWNER_ID, 0, dms_ctx->inst_id, master_id, dms_ctx->sess_id,
        CM_INVALID_ID16);
    req.head.size = (uint16)sizeof(dms_ask_xa_owner_req_t);
    req.sess_type = dms_ctx->sess_type;
    errno_t err = memcpy_sp((char *)&req.xa_xid, sizeof(drc_global_xid_t), &dms_ctx->global_xid,
        sizeof(drc_global_xid_t));
    if (err != EOK) {
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_XA_OWNER_ID);
        DMS_THROW_ERROR(ERR_SYSTEM_CALL, err);
        return err;
    }

    DDES_FAULT_INJECTION_CALL(DMS_FI_REQ_ASK_XA_OWNER_ID, MSG_REQ_ASK_XA_OWNER_ID);
    int32 ret = mfc_send_data(&req.head);
    if (ret != DMS_SUCCESS) {
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_XA_OWNER_ID);
        DMS_THROW_ERROR(ERRNO_DMS_SEND_MSG_FAILED, ret, MSG_REQ_ASK_XA_OWNER_ID, master_id);
        return ERRNO_DMS_SEND_MSG_FAILED;
    }

    LOG_DEBUG_INF("[TXN][%s][dms_ask_xa_owner_remote]: src_id = %u, dst_id = %u", cm_display_resid((char *)global_xid,
        DRC_RES_GLOBAL_XA_TYPE), dms_ctx->inst_id, master_id);
    dms_message_t msg = { 0 };
    ret = mfc_get_response(req.head.ruid, &msg, DMS_WAIT_MAX_TIME);
    if (ret != DMS_SUCCESS) {
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_XA_OWNER_ID);
        LOG_DEBUG_ERR("[TXN][%s][dms_ask_xa_owner_remote]: wait owner ack of xa res timeout timeout=%d ms",
            cm_display_resid((char *)global_xid, DRC_RES_GLOBAL_XA_TYPE), DMS_WAIT_MAX_TIME);
        DMS_THROW_ERROR(ERRNO_DMS_RECV_MSG_FAILED, ret, MSG_REQ_CREATE_GLOBAL_XA_RES, master_id);
        return ERRNO_DMS_DCS_ASK_FOR_RES_MSG_FAULT;
    }

    if (msg.head->cmd == MSG_ACK_ERROR) {
        cm_print_error_msg_and_throw_error(msg.buffer);
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_XA_OWNER_ID);
        mfc_release_response(&msg);
        return ERRNO_DMS_COMMON_MSG_ACK;
    }

    CM_CHK_RESPONSE_SIZE(&msg, sizeof(dms_ask_xa_owner_ack_t), CM_FALSE);
    dms_ask_xa_owner_ack_t *ack = (dms_ask_xa_owner_ack_t *)msg.buffer;
    if (ack->owner_id == CM_INVALID_ID8) {
        LOG_DEBUG_ERR("[TXN][%s][dms_ask_xa_owner_remote]: xa res not exists", cm_display_resid((char *)global_xid,
            DRC_RES_GLOBAL_XA_TYPE));
        mfc_release_response(&msg);
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_XA_OWNER_ID);
        DMS_THROW_ERROR(ERRNO_DMS_DRC_XA_RES_NOT_EXISTS, cm_display_resid((char *)global_xid, DRC_RES_GLOBAL_XA_TYPE));
        return ERRNO_DMS_DRC_XA_RES_NOT_EXISTS;
    }

    *owner_id = ack->owner_id;
    mfc_release_response(&msg);
    dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_XA_OWNER_ID);
    return DMS_SUCCESS;
}

int32 dms_request_xa_owner(dms_context_t *dms_ctx, unsigned char *owner_id)
{
    drc_global_xid_t *global_xid = &dms_ctx->global_xid;

    LOG_DEBUG_INF("[TXN][%s] dms_request_xa_owner", cm_display_resid((char *)global_xid,
        DRC_RES_GLOBAL_XA_TYPE));

    uint8 master_id = 0xFF;
    int32 ret = drc_get_master_id((char *)global_xid, DRC_RES_GLOBAL_XA_TYPE, &master_id);
    if (ret != DMS_SUCCESS) {
        LOG_DEBUG_ERR("[TXN][%s] failed to get master id when get xa owner id", cm_display_resid((char *)global_xid,
            DRC_RES_GLOBAL_XA_TYPE));
        return ret;
    }

    if (master_id == dms_ctx->inst_id) {
        return dms_ask_xa_owner_local(dms_ctx, owner_id);
    } else {
        return dms_ask_xa_owner_remote(dms_ctx, master_id, owner_id);
    }
}

int32 dms_request_end_xa(dms_context_t *dms_ctx, uint8 owner_id, uint64 flags, uint64 scn, bool8 is_commit,
    int32 *return_code)
{
    dms_end_xa_req_t req = { 0 };
    drc_global_xid_t *xid = &dms_ctx->global_xid;

    dms_begin_stat(dms_ctx->sess_id, DMS_EVT_DCS_REQ_END_XA, CM_TRUE);
    DMS_INIT_MESSAGE_HEAD(&req.head, MSG_REQ_END_XA, 0, dms_ctx->inst_id, owner_id, dms_ctx->sess_id, CM_INVALID_ID16);
    req.head.size = (uint16)sizeof(dms_end_xa_req_t);
    req.commit_scn = scn;
    req.flags = flags;
    req.is_commit = is_commit;
    errno_t err = memcpy_sp(&req.xa_xid, sizeof(drc_global_xid_t), xid, sizeof(drc_global_xid_t));
    if (err != EOK) {
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_END_XA);
        DMS_THROW_ERROR(ERR_SYSTEM_CALL, err);
        return err;
    }

    LOG_DEBUG_INF("[TXN][%s] end the xa remote, src_id = %u, dst_id = %u", cm_display_resid((char *)xid,
        DRC_RES_GLOBAL_XA_TYPE), dms_ctx->inst_id, owner_id);
    DDES_FAULT_INJECTION_CALL(DMS_FI_REQ_END_XA, MSG_REQ_END_XA);
    int32 ret = mfc_send_data(&req.head);
    if (ret != DMS_SUCCESS) {
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_END_XA);
        DMS_THROW_ERROR(ERRNO_DMS_SEND_MSG_FAILED, ret, MSG_REQ_END_XA, owner_id);
        return ERRNO_DMS_SEND_MSG_FAILED;
    }

    dms_message_t msg = { 0 };
    ret = mfc_get_response(req.head.ruid, &msg, DMS_WAIT_MAX_TIME);
    if (ret != DMS_SUCCESS) {
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_END_XA);
        LOG_DEBUG_ERR("[TXN][%s] wait owner ack timeout while end xa, timeout = %u ms", cm_display_resid((char *)xid,
            DRC_RES_GLOBAL_XA_TYPE), DMS_WAIT_MAX_TIME);
        DMS_THROW_ERROR(ERRNO_DMS_RECV_MSG_FAILED, ret, MSG_REQ_END_XA, owner_id);
        return ERRNO_DMS_RECV_MSG_FAILED;
    }

    if (msg.head->cmd == MSG_ACK_ERROR) {
        cm_print_error_msg_and_throw_error(msg.buffer);
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_END_XA);
        mfc_release_response(&msg);
        return ERRNO_DMS_COMMON_MSG_ACK;
    }

    CM_CHK_RESPONSE_SIZE(&msg, (uint32)sizeof(dms_end_xa_ack_t), CM_FALSE);
    dms_end_xa_ack_t *ack = (dms_end_xa_ack_t *)msg.buffer;
    *return_code = ack->return_code;
    mfc_release_response(&msg);
    dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_END_XA);
    return DMS_SUCCESS;
}

void dms_proc_end_xa(dms_process_context_t *proc_ctx, dms_message_t *receive_msg)
{
    CM_CHK_PROC_MSG_SIZE_NO_ERR(receive_msg, (uint32)sizeof(dms_end_xa_req_t), CM_TRUE);
    dms_end_xa_req_t req = *(dms_end_xa_req_t *)receive_msg->buffer;

    drc_global_xid_t *xid = &req.xa_xid;
    if (!dms_proc_check_xid_valid(xid)) {
        cm_send_error_msg(receive_msg->head, ERRNO_DMS_DRC_XA_RES_NOT_EXISTS, "invalid xid for xa end");
        return;
    }

    int32 ret = g_dms.callback.end_xa(proc_ctx->db_handle, xid, req.flags, req.commit_scn, req.is_commit);
    dms_end_xa_ack_t ack = { 0 };
    dms_init_ack_head(&req.head, &ack.head, MSG_ACK_END_XA, sizeof(dms_end_xa_ack_t), proc_ctx->sess_id);
    ack.return_code = (ret == DMS_SUCCESS ? ret : g_dms.callback.db_get_kernel_error_code());

    LOG_DEBUG_INF("[DMS][%s][dms_proc_end_xa]: src_id = %u, src_sid = %u",
        cm_display_resid((char *)xid, DRC_RES_GLOBAL_XA_TYPE), (uint8)req.head.src_inst, (uint16)req.head.src_sid);

    DDES_FAULT_INJECTION_CALL(DMS_FI_ACK_END_XA, MSG_ACK_END_XA);
    ret = mfc_send_data(&ack.head);
    if (ret == DMS_SUCCESS) {
        LOG_DEBUG_INF("[DMS][%s][dms_proc_end_xa]: finished, dst_id = %u, dst_sid = %u",
            cm_display_resid((char *)xid, DRC_RES_GLOBAL_XA_TYPE), (uint8)ack.head.dst_inst, (uint16)ack.head.dst_sid);
    } else {
        LOG_DEBUG_ERR("[DMS][%s][dms_proc_end_xa]: failed to send ack, dst_id = %u, dst_sid = %u, errcode = %u",
            cm_display_resid((char *)xid, DRC_RES_GLOBAL_XA_TYPE), (uint8)ack.head.dst_inst,
            (uint16)ack.head.dst_sid, ret);
    }
}

static int32 dms_ask_xa_inuse_remote(dms_context_t *dms_ctx, uint8 owner_id, bool8 *inuse)
{
    dms_ask_xa_inuse_req_t req = { 0 };
    drc_global_xid_t *global_xid = &dms_ctx->global_xid;

    dms_begin_stat(dms_ctx->sess_id, DMS_EVT_DCS_REQ_XA_IN_USE, CM_TRUE);
    DMS_INIT_MESSAGE_HEAD(&req.head, MSG_REQ_ASK_XA_IN_USE, 0, dms_ctx->inst_id, owner_id, dms_ctx->sess_id,
        CM_INVALID_ID16);

    req.head.size = (uint16)sizeof(dms_ask_xa_inuse_req_t);
    req.sess_type = dms_ctx->sess_type;
    errno_t err = memcpy_sp((char *)&req.xa_xid, sizeof(drc_global_xid_t), &dms_ctx->global_xid,
        sizeof(drc_global_xid_t));
    if (err != EOK) {
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_XA_IN_USE);
        DMS_THROW_ERROR(ERR_SYSTEM_CALL, err);
        return err;
    }

    LOG_DEBUG_INF("[TXN][%s][dms_ask_xa_inuse_remote]: src_id = %u, dst_id = %u", cm_display_resid((char *)global_xid,
        DRC_RES_GLOBAL_XA_TYPE), dms_ctx->inst_id, owner_id);
    DDES_FAULT_INJECTION_CALL(DMS_FI_REQ_ASK_XA_IN_USE, MSG_REQ_ASK_XA_IN_USE);
    int32 ret = mfc_send_data(&req.head);
    if (ret != DMS_SUCCESS) {
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_XA_IN_USE);
        DMS_THROW_ERROR(ERRNO_DMS_SEND_MSG_FAILED, ret, MSG_REQ_ASK_XA_IN_USE, owner_id);
        return ERRNO_DMS_SEND_MSG_FAILED;
    }

    dms_message_t msg = { 0 };
    ret = mfc_get_response(req.head.ruid, &msg, DMS_WAIT_MAX_TIME);
    if (ret != DMS_SUCCESS) {
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_XA_IN_USE);
        LOG_DEBUG_ERR("[TXN][%s][dms_ask_xa_inuse_remote]: wait if in use ack of xa res timeout, timeout = %d ms",
            cm_display_resid((char *)global_xid, DRC_RES_GLOBAL_XA_TYPE), DMS_WAIT_MAX_TIME);
        DMS_THROW_ERROR(ERRNO_DMS_RECV_MSG_FAILED, ret, MSG_REQ_ASK_XA_IN_USE, owner_id);
        return ERRNO_DMS_RECV_MSG_FAILED;
    }

    if (msg.head->cmd == MSG_ACK_ERROR) {
        cm_print_error_msg_and_throw_error(msg.buffer);
        dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_XA_IN_USE);
        mfc_release_response(&msg);
        return ERRNO_DMS_COMMON_MSG_ACK;
    }

    CM_CHK_RESPONSE_SIZE(&msg, sizeof(dms_ask_xa_inuse_ack_t), CM_FALSE);
    dms_ask_xa_inuse_ack_t *ack = (dms_ask_xa_inuse_ack_t *)msg.buffer;
    *inuse = ack->inuse;

    mfc_release_response(&msg);
    dms_end_stat_ex(dms_ctx->sess_id, DMS_EVT_DCS_REQ_XA_IN_USE);
    return DMS_SUCCESS;
}

int32 dms_request_xa_inuse(dms_context_t *dms_ctx, uint8 owner_id, bool8 *inuse)
{
    drc_global_xid_t *global_xid = &dms_ctx->global_xid;
    LOG_DEBUG_INF("[TXN][%s] enter dms_request_xa_inuse", cm_display_resid((char *)global_xid,
        DRC_RES_GLOBAL_XA_TYPE));

    if (owner_id == dms_ctx->inst_id) {
        *inuse = g_dms.callback.xa_inuse(dms_ctx->db_handle, (void *)global_xid);
        return DMS_SUCCESS;
    } else {
        return dms_ask_xa_inuse_remote(dms_ctx, owner_id, inuse);
    }
}

void dms_proc_ask_xa_inuse(dms_process_context_t *proc_ctx, dms_message_t *receive_msg)
{
    CM_CHK_PROC_MSG_SIZE_NO_ERR(receive_msg, (uint32)sizeof(dms_ask_xa_inuse_req_t), CM_TRUE);
    dms_ask_xa_inuse_req_t req = *(dms_ask_xa_inuse_req_t *)receive_msg->buffer;

    drc_global_xid_t *xid = &req.xa_xid;
    if (!dms_proc_check_xid_valid(xid)) {
        cm_send_error_msg(receive_msg->head, ERRNO_DMS_DRC_XA_RES_NOT_EXISTS, "invalid xid for xa ask inuse");
        return;
    }

    dms_ask_xa_inuse_ack_t ack = { 0 };
    dms_init_ack_head(&req.head, &ack.head, MSG_ACK_XA_IN_USE, sizeof(dms_ask_xa_inuse_ack_t), proc_ctx->sess_id);
    ack.inuse = g_dms.callback.xa_inuse(proc_ctx->db_handle, (void *)xid);

    LOG_DEBUG_INF("[DMS][%s][dms_proc_ask_xa_inuse]: src_id = %u, src_sid = %u",
        cm_display_resid((char *)xid, DRC_RES_GLOBAL_XA_TYPE), (uint8)req.head.src_inst, (uint16)req.head.src_sid);

    DDES_FAULT_INJECTION_CALL(DMS_FI_ACK_XA_IN_USE, MSG_ACK_XA_IN_USE);
    int32 ret = mfc_send_data(&ack.head);
    if (ret == DMS_SUCCESS) {
        LOG_DEBUG_INF("[DMS][%s][dms_proc_ask_xa_inuse]: finished, dst_id = %u, dst_sid = %u",
            cm_display_resid((char *)xid, DRC_RES_GLOBAL_XA_TYPE), (uint8)ack.head.dst_inst, (uint16)ack.head.dst_sid);
    } else {
        LOG_DEBUG_ERR("[DMS][%s][dms_proc_ask_xa_inuse]: failed to send ack, dst_id = %u, dst_sid = %u, errcode = %u",
            cm_display_resid((char *)xid, DRC_RES_GLOBAL_XA_TYPE), (uint8)ack.head.dst_inst,
            (uint16)ack.head.dst_sid, ret);
    }
}