* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* dtc_dcs.c
*
*
* IDENTIFICATION
* src/cluster/dtc_dcs.c
*
* -------------------------------------------------------------------------
*/
#include "knl_cluster_module.h"
#include "dtc_dcs.h"
#include "cm_defs.h"
#include "cm_thread.h"
#include "knl_context.h"
#include "srv_instance.h"
#include "pcr_heap.h"
#include "pcr_heap_undo.h"
#include "pcr_heap_scan.h"
#include "pcr_btree.h"
#include "pcr_btree_scan.h"
#include "dtc_drc.h"
#include "dtc_database.h"
#include "dtc_tran.h"
#include "dtc_dc.h"
#include "dtc_buffer.h"
#include "dtc_recovery.h"
#include "dtc_trace.h"
#include "dtc_ckpt.h"
#include "rc_reform.h"
#include "dtc_context.h"
bool32 dcs_page_latch_usable[][DRC_LOCK_MODE_MAX] = {
{ OG_FALSE, OG_FALSE, OG_FALSE },
{ OG_FALSE, OG_TRUE, OG_TRUE },
{ OG_FALSE, OG_FALSE, OG_TRUE },
{ OG_FALSE, OG_TRUE, OG_TRUE },
};
bool32 dcs_local_page_usable(knl_session_t *session, buf_ctrl_t *ctrl, latch_mode_t mode)
{
return dcs_page_latch_usable[mode][ctrl->lock_mode];
}
status_t dcs_send_data_retry(void *msg)
{
uint32 retry_time = 0;
status_t status = OG_SUCCESS;
SYNC_POINT_GLOBAL_START(OGRAC_DCS_SEND_EDP_MESSAGE_FAIL, &status, OG_ERROR);
status = mes_send_data(msg);
SYNC_POINT_GLOBAL_END;
while (status != OG_SUCCESS) {
retry_time++;
mes_message_head_t *head = (mes_message_head_t *)msg;
if (head->dst_inst >= OG_MAX_INSTANCES) {
OG_LOG_RUN_ERR("invalid inst id(%u)", head->dst_inst);
return OG_ERROR;
}
cluster_view_t view;
rc_get_cluster_view(&view, OG_FALSE);
if (DB_CLUSTER_NO_CMS) {
view.bitmap = 0;
}
if (rc_bitmap64_exist(&view.bitmap, head->dst_inst)) {
cm_sleep(DCS_RESEND_MSG_INTERVAL);
SYNC_POINT_GLOBAL_START(OGRAC_DCS_SEND_EDP_MESSAGE_FAIL, &status, OG_ERROR);
status = mes_send_data(msg);
SYNC_POINT_GLOBAL_END;
} else {
OG_LOG_RUN_WAR_LIMIT(LOG_PRINT_INTERVAL_SECOND_20, "inst id(%u) is not alive, alive bitmap:%llu",
head->dst_inst, view.bitmap);
return status;
}
if (retry_time % DCS_RESEND_MSG_TIMES == 0) {
OG_LOG_RUN_WAR("send message failed times:%u, alive bitmap:%llu", retry_time, view.bitmap);
}
}
return status;
}
status_t dcs_send_data3_retry(mes_message_head_t *head, uint32 head_size, const void *body)
{
if (head->dst_inst >= OG_MAX_INSTANCES) {
OG_LOG_RUN_ERR("invalid inst id(%u)", head->dst_inst);
return OG_ERROR;
}
uint32 retry_time = 0;
status_t status = OG_SUCCESS;
status = mes_send_data3(head, head_size, body);
while (status != OG_SUCCESS) {
retry_time++;
cluster_view_t view;
rc_get_cluster_view(&view, OG_FALSE);
if (DB_CLUSTER_NO_CMS) {
view.bitmap = 0;
}
if (rc_bitmap64_exist(&view.bitmap, head->dst_inst)) {
cm_sleep(DCS_RESEND_MSG_INTERVAL);
status = mes_send_data3(head, head_size, body);
} else {
OG_LOG_RUN_WAR("inst id(%u) is not alive, alive bitmap:%llu", head->dst_inst, view.bitmap);
return status;
}
if (retry_time % DCS_RESEND_MSG_TIMES == 0) {
OG_LOG_RUN_WAR("send message failed times:%u, alive bitmap:%llu", retry_time, view.bitmap);
}
}
return status;
}
static inline status_t dcs_claim_ownership_r(knl_session_t *session, uint32 master_id, page_id_t page_id,
bool32 has_edp, drc_lock_mode_e mode, uint64 page_lsn, uint64 req_version)
{
msg_claim_owner_t request;
status_t ret;
mes_init_send_head(&request.head, MES_CMD_CLAIM_OWNER_REQ, sizeof(msg_claim_owner_t), OG_INVALID_ID32,
session->kernel->dtc_attr.inst_id, master_id, session->id, OG_INVALID_ID16);
request.page_id = page_id;
request.has_edp = has_edp;
request.mode = mode;
request.lsn = page_lsn;
request.req_version = req_version;
SYNC_POINT_GLOBAL_START(OGRAC_DCS_CLAIM_OWNER_SEND_FAIL, &ret, OG_ERROR);
ret = dcs_send_data_retry(&request);
SYNC_POINT_GLOBAL_END;
DTC_DCS_DEBUG(ret, "[DCS][%u-%u][%s]: src_id=%u, dest_id=%u, has_edp=%u, req mode=%d, page lsn=%llu", page_id.file,
page_id.page, MES_CMD2NAME(request.head.cmd), request.head.src_inst, request.head.dst_inst,
request.has_edp, request.mode, page_lsn);
return ret;
}
static inline status_t dcs_notify_local_owner4page(knl_session_t *session, cvt_info_t *cvt_info)
{
msg_page_req_t page_req;
status_t ret;
mes_init_send_head(&page_req.head, MES_CMD_ASK_OWNER, sizeof(msg_page_req_t), cvt_info->req_rsn, cvt_info->req_id,
cvt_info->owner_id, cvt_info->req_sid, OG_INVALID_ID16);
page_req.page_id = cvt_info->pageid;
page_req.req_mode = cvt_info->req_mode;
page_req.curr_mode = cvt_info->curr_mode;
page_req.req_version = cvt_info->req_version;
page_req.action = DRC_RES_INVALID_ACTION;
page_req.lsn = cvt_info->lsn;
page_req.is_retry = OG_FALSE;
if (DRC_STOP_DCS_IO_FOR_REFORMING(page_req.req_version, session, page_req.page_id)) {
OG_LOG_RUN_ERR("[DCS][%u-%u]: reforming, notify local owner4page failed, req_rsn=%u, "
"req_version=%llu, cur_version=%llu",
page_req.page_id.file, page_req.page_id.page, cvt_info->req_rsn, page_req.req_version,
DRC_GET_CURR_REFORM_VERSION);
return OG_ERROR;
}
SYNC_POINT_GLOBAL_START(OGRAC_DCS_NOTIFY_OWNER_SEND_FAIL, &ret, OG_ERROR);
ret = dcs_send_data_retry(&page_req);
SYNC_POINT_GLOBAL_END;
DTC_DCS_DEBUG(ret,
"[DCS][%u-%u][%s internal]: status=(%d), src_id=%u, src_sid=%u, "
"dest_id=%u, dest_sid=%u, req_mode=%u, curr_mode=%u, copy_insts=%llu",
cvt_info->pageid.file, cvt_info->pageid.page, MES_CMD2NAME(page_req.head.cmd), ret,
page_req.head.src_inst, page_req.head.src_sid, page_req.head.dst_inst, page_req.head.dst_sid,
page_req.req_mode, page_req.curr_mode, cvt_info->readonly_copies);
SYNC_POINT_GLOBAL_START(OGRAC_DCS_NOTIFY_OWNER_SUCC_ABORT, NULL, 0);
SYNC_POINT_GLOBAL_END;
return ret;
}
extern status_t dcs_try_notify_owner_for_page(knl_session_t *session, cvt_info_t *cvt_info);
static status_t dcs_claim_ownership_internal(knl_session_t *session, claim_info_t *claim_info, uint64 req_version)
{
cvt_info_t cvt_info;
drc_claim_page_owner(session, claim_info, &cvt_info, req_version);
DTC_DCS_DEBUG_INF("[DCS][%u-%u][claim owner]: src_id=%u, mode=%u, has_edp=%u, page lsn=%llu",
claim_info->page_id.file, claim_info->page_id.page, DCS_SELF_INSTID(session), claim_info->mode,
claim_info->has_edp, claim_info->lsn);
if (cvt_info.req_id == OG_INVALID_ID8) {
return OG_SUCCESS;
}
if (cvt_info.owner_id == DCS_SELF_INSTID(session)) {
return dcs_notify_local_owner4page(session, &cvt_info);
}
return dcs_try_notify_owner_for_page(session, &cvt_info);
}
static status_t dcs_claim_ownership_l(knl_session_t *session, page_id_t page_id, drc_lock_mode_e mode, bool32 has_edp,
uint64 page_lsn, uint64 req_version)
{
claim_info_t claim_info;
DRC_SET_CLAIM_INFO(&claim_info, DCS_SELF_INSTID(session), session->id, page_id, has_edp, mode, page_lsn);
return dcs_claim_ownership_internal(session, &claim_info, req_version);
}
static void dcs_handle_page_from_owner(knl_session_t *session, buf_ctrl_t *ctrl, mes_message_t *msg,
drc_lock_mode_e mode)
{
msg_ask_page_ack_t *ack = (msg_ask_page_ack_t *)(msg->buffer);
uint8 flags = msg->head->flags;
uint16 size = msg->head->size;
knl_panic(ack->head.cmd == MES_CMD_PAGE_READY);
knl_panic(DCS_BUF_CTRL_NOT_OWNER(session, ctrl));
if (size > DEFAULT_PAGE_SIZE(session)) {
knl_panic(!(flags & MES_FLAG_NEED_LOAD) && !(flags & MES_FLAG_READONLY2X));
uint64 new_lsn = ((page_head_t *)(msg->buffer + sizeof(msg_ask_page_ack_t)))->lsn;
knl_panic(new_lsn >= dtc_get_ctrl_lsn(ctrl));
errno_t err = memcpy_sp(ctrl->page, DEFAULT_PAGE_SIZE(session), msg->buffer + sizeof(msg_ask_page_ack_t),
DEFAULT_PAGE_SIZE(session));
knl_securec_check(err);
session->stat->dcs_buffer_gets++;
}
if (ack->lsn != 0) {
dtc_update_lsn(session, ack->lsn);
}
if (ack->scn != 0) {
dtc_update_scn(session, ack->scn);
}
if ((flags & MES_FLAG_NEED_LOAD) && ctrl->is_edp) {
DTC_DCS_DEBUG_INF("[DCS][%u-%u][%s]: need load, and clean edp, dirty=%u", ctrl->page_id.file,
ctrl->page_id.page, MES_CMD2NAME(msg->head->cmd), ctrl->is_dirty);
dcs_buf_clean_ctrl_edp(session, ctrl, OG_TRUE);
}
knl_panic(mode == ack->mode);
ctrl->lock_mode = mode;
ctrl->edp_map = (ack->edp_map) & (~(1ULL << session->kernel->id));
ctrl->is_fixed = 0;
if (ctrl->lock_mode == DRC_LOCK_EXCLUSIVE) {
ctrl->is_edp = 0;
ctrl->is_remote_dirty = DCS_ACK_PG_IS_DIRTY(msg) || DCS_ACK_PG_IS_REMOTE_DIRTY(msg);
} else {
ctrl->is_remote_dirty = 0;
}
ctrl->force_request = 0;
CM_MFENCE;
if (flags & MES_FLAG_NEED_LOAD) {
ctrl->load_status = (uint8)BUF_NEED_LOAD;
} else {
ctrl->load_status = (uint8)BUF_IS_LOADED;
}
ctrl->in_recovery = OGRAC_SESSION_IN_RECOVERY(session);
DTC_DCS_DEBUG_INF(
"[DCS][%u-%u][%s]: handle page from owner, lock mode=%u, edp=%d, edp_map=%llu, src_id=%u, src_sid=%u,"
"dest_id=%u, dest_sid=%u, mode=%u, dirty=%u, remote dirty=%u, remote remote diry=%u, page pcn=%d,"
"page lsn=%llu, sync lsn=%llu, sync scn=%llu, page_type=%u, load_status=%d, in_recovery=%d",
ctrl->page_id.file, ctrl->page_id.page, MES_CMD2NAME(msg->head->cmd), ctrl->lock_mode, ctrl->is_edp,
ctrl->edp_map, msg->head->src_inst, msg->head->src_sid, msg->head->dst_inst, msg->head->dst_sid, mode,
ctrl->is_dirty, ctrl->is_remote_dirty, DCS_ACK_PG_IS_REMOTE_DIRTY(msg), ctrl->page->pcn, ctrl->page->lsn,
ack->lsn, ack->scn, ((heap_page_t *)ctrl->page)->head.type, ctrl->load_status, ctrl->in_recovery);
}
static status_t dcs_send_ask_master_req(knl_session_t *session, uint8 master_id, buf_ctrl_t *ctrl,
drc_lock_mode_e req_mode, uint64 req_version)
{
msg_page_req_t page_req;
status_t ret;
mes_init_send_head(&page_req.head, MES_CMD_ASK_MASTER, sizeof(msg_page_req_t), OG_INVALID_ID32,
DCS_SELF_INSTID(session), master_id, DCS_SELF_SID(session), OG_INVALID_ID16);
page_req.page_id = ctrl->page_id;
page_req.req_mode = req_mode;
page_req.curr_mode = ctrl->lock_mode;
page_req.req_version = req_version;
page_req.lsn = dtc_get_ctrl_lsn(ctrl);
DTC_DCS_DEBUG_INF(
"[DCS][%u-%u][%s]: src_id=%u, dest_id=%u, req_mode=%u, curr_mode=%u, is_dirty=%d, is_edp=%d, is_remote_dirty=%d, pcn=%d, lsn=%llu",
ctrl->page_id.file, ctrl->page_id.page, MES_CMD2NAME(MES_CMD_ASK_MASTER), DCS_SELF_INSTID(session), master_id,
req_mode, ctrl->lock_mode, ctrl->is_dirty, ctrl->is_edp, ctrl->is_remote_dirty, ctrl->page->pcn,
ctrl->page->lsn);
SYNC_POINT_GLOBAL_START(OGRAC_DCS_ASK_MASTER_SEND_FAIL, &ret, OG_ERROR);
ret = dcs_send_data_retry(&page_req);
SYNC_POINT_GLOBAL_END;
if (ret == OG_SUCCESS) {
SYNC_POINT_GLOBAL_START(OGRAC_DCS_ASK_MASTER_SUCC_ABORT, NULL, 0);
SYNC_POINT_GLOBAL_END;
return OG_SUCCESS;
}
cm_reset_error();
OG_THROW_ERROR(ERR_DCS_MSG_EAGAIN, "failed to send ask master request. Try again later");
return OG_ERROR;
}
void static inline dcs_leave_page(knl_session_t *session)
{
buf_ctrl_t *ctrl = buf_curr_page(session);
buf_unlatch_page(session, ctrl);
buf_pop_page(session);
}
static inline void dcs_set_ctrl4granted(knl_session_t *session, buf_ctrl_t *ctrl)
{
ctrl->load_status = (uint8)BUF_NEED_LOAD;
if (ctrl->is_edp) {
knl_panic(ctrl && ctrl->is_dirty && ctrl->is_edp && !ctrl->is_readonly);
dcs_buf_clean_ctrl_edp(session, ctrl, OG_TRUE);
DTC_DCS_DEBUG_INF("[DCS]edp page[%u-%u] (lsn:%lld) is ok", ctrl->page_id.file, ctrl->page_id.page,
ctrl->page->lsn);
}
ctrl->lock_mode = DRC_LOCK_EXCLUSIVE;
ctrl->edp_map = 0;
ctrl->is_fixed = 0;
ctrl->force_request = 0;
ctrl->transfer_status = BUF_TRANS_NONE;
ctrl->in_recovery = OGRAC_SESSION_IN_RECOVERY(session);
DTC_DCS_DEBUG_INF("[DCS][%u-%u][dcs set ctrl4granted] success", ctrl->page_id.file, ctrl->page_id.page);
}
static inline void dcs_set_ctrl4already_owner(knl_session_t *session, buf_ctrl_t *ctrl, drc_lock_mode_e req_mode,
uint8 action)
{
ctrl->is_fixed = 0;
if (action != DRC_RES_INVALID_ACTION && ctrl->is_edp) {
knl_panic(action == DRC_RES_SHARE_ACTION || action == DRC_RES_EXCLUSIVE_ACTION);
dcs_clean_local_ctrl(session, ctrl, action, OG_INVALID_ID64);
}
knl_panic(ctrl->is_edp == 0);
if (req_mode == DRC_LOCK_EXCLUSIVE) {
ctrl->lock_mode = DRC_LOCK_EXCLUSIVE;
ctrl->is_edp = 0;
} else {
ctrl->lock_mode = DRC_LOCK_SHARE;
CM_ASSERT(req_mode == DRC_LOCK_SHARE);
}
ctrl->transfer_status = BUF_TRANS_NONE;
ctrl->in_recovery = OGRAC_SESSION_IN_RECOVERY(session);
#ifdef DB_DEBUG_VERSION
if (ctrl->load_status == BUF_NEED_LOAD) {
DTC_DCS_DEBUG_INF("[DCS][%u-%u]: set ctrl already owner, need load page, lock mode(%d).", ctrl->page_id.file,
ctrl->page_id.page, ctrl->lock_mode);
}
#endif
}
static inline status_t dcs_handle_ack_need_load(knl_session_t *session, mes_message_t *msg, buf_ctrl_t *ctrl,
drc_lock_mode_e mode)
{
msg_pg_ack_ld_t *ack = (msg_pg_ack_ld_t *)msg->buffer;
page_id_t page_id = ctrl->page_id;
if (DRC_STOP_DCS_IO_FOR_REFORMING(ack->req_version, session, page_id)) {
OG_LOG_RUN_ERR("[DCS][%u-%u]reforming, handle ack need_load failed, req_version=%llu, cur_version=%llu",
page_id.file, page_id.page, ack->req_version, DRC_GET_CURR_REFORM_VERSION);
return OG_ERROR;
}
dtc_update_lsn(session, ack->master_lsn);
dtc_update_scn(session, ack->scn);
dcs_set_ctrl4granted(session, ctrl);
return OG_SUCCESS;
}
static inline status_t dcs_handle_ack_already_owner(knl_session_t *session, uint32 master_id, mes_message_t *msg,
buf_ctrl_t *ctrl, drc_lock_mode_e mode)
{
msg_ack_owner_t *ack = (msg_ack_owner_t *)msg->buffer;
page_id_t page_id = ctrl->page_id;
if (DRC_STOP_DCS_IO_FOR_REFORMING(ack->req_version, session, page_id)) {
OG_LOG_RUN_ERR("[DCS][%u-%u]reforming, handle ack already owner failed, masterId=%u, "
"req_version=%llu, cur_version=%llu",
page_id.file, page_id.page, master_id, ack->req_version, DRC_GET_CURR_REFORM_VERSION);
return OG_ERROR;
}
knl_panic(mode <= ack->req_mode);
dcs_set_ctrl4already_owner(session, ctrl, ack->req_mode, ack->action);
if (master_id != DCS_SELF_INSTID(session)) {
(void)dcs_claim_ownership_r(session, master_id, ctrl->page_id, OG_FALSE, ack->req_mode,
dtc_get_ctrl_latest_lsn(ctrl), ack->req_version);
} else {
(void)dcs_claim_ownership_l(session, ctrl->page_id, ack->req_mode, OG_FALSE, dtc_get_ctrl_latest_lsn(ctrl),
ack->req_version);
}
return OG_SUCCESS;
}
static inline status_t dcs_handle_ack_page_ready(knl_session_t *session, uint32 master_id, mes_message_t *msg,
buf_ctrl_t *ctrl, drc_lock_mode_e lock_mode)
{
drc_lock_mode_e mode = lock_mode;
msg_ask_page_ack_t *ack = (msg_ask_page_ack_t *)(msg->buffer);
page_id_t page_id = ctrl->page_id;
if (DRC_STOP_DCS_IO_FOR_REFORMING(ack->req_version, session, page_id)) {
OG_LOG_RUN_ERR("[DCS][%u-%u]reforming, handle ack page ready failed, masterId=%u,"
"req_version=%llu, cur_version=%llu",
page_id.file, page_id.page, master_id, ack->req_version, DRC_GET_CURR_REFORM_VERSION);
return OG_ERROR;
}
uint8 ack_mode = ((msg_ask_page_ack_t *)(msg->buffer))->mode;
knl_panic(mode <= ack_mode);
mode = ack_mode;
dcs_handle_page_from_owner(session, ctrl, msg, mode);
if (master_id != DCS_SELF_INSTID(session)) {
(void)dcs_claim_ownership_r(session, master_id, ctrl->page_id, DCS_ACK_PG_IS_DIRTY(msg), mode,
dtc_get_ctrl_latest_lsn(ctrl), ack->req_version);
} else {
(void)dcs_claim_ownership_l(session, ctrl->page_id, mode, DCS_ACK_PG_IS_DIRTY(msg),
dtc_get_ctrl_latest_lsn(ctrl), ack->req_version);
}
return OG_SUCCESS;
}
static status_t dcs_handle_ask_master_ack(knl_session_t *session, uint8 master_id, buf_ctrl_t *ctrl,
drc_lock_mode_e mode, wait_event_t *ack_event)
{
if (ack_event) {
*ack_event = DCS_REQ_MASTER4PAGE_2WAY;
}
mes_message_t msg;
status_t ret = mes_recv(session->id, &msg, OG_TRUE, mes_get_current_rsn(session->id), DCS_WAIT_MSG_TIMEOUT);
if (SECUREC_UNLIKELY(ret != OG_SUCCESS)) {
OG_LOG_RUN_ERR("[DCS][%u-%u][wait for master ack]: timeout, timeout=%u ms", ctrl->page_id.file,
ctrl->page_id.page, DCS_WAIT_MSG_TIMEOUT);
return OG_ERROR;
}
SYNC_POINT_GLOBAL_START(OGRAC_DCS_ASK_MASTER_ACK_SUCC_ABORT, NULL, 0);
SYNC_POINT_GLOBAL_END;
DTC_DCS_DEBUG_INF("[DCS][%u-%u][%s]: src_id=%u, dest_id=%u, flag=%u", ctrl->page_id.file, ctrl->page_id.page,
MES_CMD2NAME(msg.head->cmd), msg.head->src_inst, msg.head->dst_inst, msg.head->flags);
switch (msg.head->cmd) {
case MES_CMD_MASTER_ACK_NEED_LOAD:
ret = dcs_handle_ack_need_load(session, &msg, ctrl, DRC_LOCK_EXCLUSIVE);
break;
case MES_CMD_MASTER_ACK_ALREADY_OWNER:
ret = dcs_handle_ack_already_owner(session, master_id, &msg, ctrl, mode);
break;
case MES_CMD_ERROR_MSG:
ret = OG_ERROR;
break;
default:
ret = dcs_handle_ack_page_ready(session, master_id, &msg, ctrl, mode);
if (ack_event) {
*ack_event = DCS_REQ_MASTER4PAGE_3WAY;
}
break;
}
mes_release_message_buf(msg.buffer);
return ret;
}
static inline status_t dcs_ask_master4page_r(knl_session_t *session, buf_ctrl_t *ctrl, uint8 master_id,
drc_lock_mode_e mode)
{
uint64 req_version = DRC_GET_CURR_REFORM_VERSION;
knl_begin_session_wait(session, DCS_REQ_MASTER4PAGE_2WAY, OG_TRUE);
status_t ret = dcs_send_ask_master_req(session, master_id, ctrl, mode, req_version);
if (ret == OG_SUCCESS) {
wait_event_t event = DCS_REQ_MASTER4PAGE_2WAY;
ret = dcs_handle_ask_master_ack(session, master_id, ctrl, mode, &event);
if (event != DCS_REQ_MASTER4PAGE_2WAY) {
knl_end_session_wait_ex(session, DCS_REQ_MASTER4PAGE_2WAY, event);
} else {
knl_end_session_wait(session, event);
}
return ret;
}
knl_end_session_wait(session, DCS_REQ_MASTER4PAGE_2WAY);
return ret;
}
static status_t dcs_try_get_page_exclusive_owner_r(knl_session_t *session, page_id_t *page_ids, uint32 count,
uint32 master_id, drc_req_owner_result_t *result)
{
msg_page_req_batch_t page_req;
mes_init_send_head(&page_req.head, MES_CMD_TRY_ASK_MASTER, sizeof(msg_page_req_batch_t), OG_INVALID_ID32,
DCS_SELF_INSTID(session), master_id, DCS_SELF_SID(session), OG_INVALID_ID16);
page_req.count = count;
errno_t err_s;
err_s = memcpy_s(page_req.page_ids, count * sizeof(page_id_t), page_ids, count * sizeof(page_id_t));
knl_securec_check(err_s);
page_req.req_version = DRC_GET_CURR_REFORM_VERSION;
knl_begin_session_wait(session, DCS_REQ_MASTER4PAGE_TRY, OG_TRUE);
if (mes_send_data(&page_req) != OG_SUCCESS) {
knl_end_session_wait(session, DCS_REQ_MASTER4PAGE_TRY);
OG_LOG_RUN_ERR("[DCS][%s]: failed to send msg, src_id=%u, src_sid=%u, dest_id=%u",
MES_CMD2NAME(page_req.head.cmd), page_req.head.src_inst, page_req.head.src_sid,
page_req.head.dst_inst);
return OG_ERROR;
}
mes_message_t msg;
if (mes_recv(session->id, &msg, OG_TRUE, page_req.head.rsn, MES_WAIT_MAX_TIME) != OG_SUCCESS) {
knl_end_session_wait(session, DCS_REQ_MASTER4PAGE_TRY);
OG_LOG_RUN_ERR("[DCS][%s]: ack timeout, src_id=%u, src_sid=%u, dest_id=%u", MES_CMD2NAME(page_req.head.cmd),
page_req.head.src_inst, page_req.head.src_sid, page_req.head.dst_inst);
return OG_ERROR;
}
knl_end_session_wait(session, DCS_REQ_MASTER4PAGE_TRY);
status_t ret = (msg.head->cmd == MES_CMD_TRY_ASK_MASTER_ACK) ? OG_SUCCESS : OG_ERROR;
session->stat->dcs_net_time += session->wait_pool[DCS_REQ_MASTER4PAGE_TRY].usecs;
if (ret == OG_SUCCESS) {
msg_owner_req_t *owner_ack = (msg_owner_req_t *)(msg.buffer);
err_s = memcpy_s(result, count * sizeof(drc_req_owner_result_t), owner_ack->result,
count * sizeof(drc_req_owner_result_t));
knl_securec_check(err_s);
knl_panic(count == owner_ack->count);
}
mes_release_message_buf(msg.buffer);
return ret;
}
static status_t dcs_read_local_page4transfer(knl_session_t *session, msg_page_req_t *page_req, buf_ctrl_t **return_ctrl,
bool32 *need_load)
{
buf_ctrl_t *ctrl;
ctrl = buf_try_latchx_page(session, page_req->page_id, (page_req->req_mode == DRC_LOCK_EXCLUSIVE));
if (ctrl == NULL) {
OG_LOG_DEBUG_WAR("[DCS][%u-%u][buf_read_local_page_for_transfer]: not found in memory", page_req->page_id.file,
page_req->page_id.page);
*return_ctrl = NULL;
return OG_SUCCESS;
}
*need_load = OG_FALSE;
*return_ctrl = ctrl;
if (ctrl->load_status == (uint8)BUF_LOAD_FAILED) {
cases:
1) page has been swapped out and just been swapped in and ask master failed. lock_mode is NULL in this case.
2) requester retry to ask master/owner for page and owner has already changed its ctrl status, lock_mode is
null.
3) In page prefetch for NORMAL or NO-READ page, we try to fetch owners for an extent of pages,
lock_mode of those pages are DRC_LOCK_EXCLUSIVE. But it's load_status may be load_failed for NO-READ
page.
*/
OG_LOG_DEBUG_WAR(
"[DCS][%u-%u][buf_read_local_page_for_transfer]: found in memory, but lock is null, is_dirty(%d), remote_dirty(%d), edp(%d), can evict(%d)",
page_req->page_id.file, page_req->page_id.page, ctrl->is_dirty, ctrl->is_remote_dirty, ctrl->is_edp,
BUF_IN_USE_IS_RECYCLABLE(ctrl));
if (BUF_IN_USE_IS_RECYCLABLE(ctrl)) {
*need_load = OG_TRUE;
session->curr_page = (char *)ctrl->page;
session->curr_page_ctrl = ctrl;
buf_push_page(session, ctrl, LATCH_MODE_X);
return OG_SUCCESS;
}
CM_ASSERT(IS_SAME_PAGID(AS_PAGID(ctrl->page->id), ctrl->page_id) && (page_req->lsn <= ctrl->page->lsn));
if (!IS_SAME_PAGID(AS_PAGID(ctrl->page->id), ctrl->page_id) || !(page_req->lsn <= ctrl->page->lsn)) {
OG_LOG_RUN_ERR("[DCS] invalid page id %u-%u, %u-%u, or lsn %llu-%llu", AS_PAGID(ctrl->page->id).file,
AS_PAGID(ctrl->page->id).page, ctrl->page_id.file, ctrl->page_id.page, page_req->lsn,
ctrl->page->lsn);
buf_unlatch_page(session, ctrl);
return OG_ERROR;
}
if (!DCS_BUF_CTRL_IS_OWNER(session, ctrl) && !page_req->is_retry) {
just skip and return error
*/
OG_LOG_RUN_WAR(
"[DCS][%u-%u][buf_read_local_page_for_transfer]: not owner and is an old msg, skip, is_dirty(%d), remote_dirty(%d), edp(%d), can evict(%d)",
page_req->page_id.file, page_req->page_id.page, ctrl->is_dirty, ctrl->is_remote_dirty, ctrl->is_edp,
BUF_IN_USE_IS_RECYCLABLE(ctrl));
buf_unlatch_page(session, ctrl);
return OG_ERROR;
}
}
CM_ASSERT(DCS_BUF_CTRL_IS_OWNER(session, ctrl) || page_req->is_retry);
if (!DCS_BUF_CTRL_IS_OWNER(session, ctrl) && !page_req->is_retry) {
OG_LOG_RUN_ERR("[DCS] invalid ctrl or page_req, lock_mode %d, is_edp %d, or page_req->is_retry %d",
ctrl->lock_mode, ctrl->is_edp, page_req->is_retry);
buf_unlatch_page(session, ctrl);
return OG_ERROR;
}
CM_ASSERT(page_req->curr_mode != DRC_LOCK_EXCLUSIVE);
if (page_req->curr_mode == DRC_LOCK_EXCLUSIVE) {
OG_LOG_RUN_ERR("[DCS] invalid page_req->curr_mode");
buf_unlatch_page(session, ctrl);
return OG_ERROR;
}
if (page_req->req_mode == DRC_LOCK_EXCLUSIVE) {
ctrl->transfer_status = BUF_TRANS_REL_OWNER;
}
session->curr_page = (char *)ctrl->page;
session->curr_page_ctrl = ctrl;
buf_push_page(session, ctrl, LATCH_MODE_X);
return OG_SUCCESS;
}
void dcs_clean_local_ctrl(knl_session_t *session, buf_ctrl_t *ctrl, drc_res_action_e action, uint64 clean_lsn)
{
buf_set_t *set = &session->kernel->buf_ctx.buf_set[ctrl->buf_pool_id];
buf_bucket_t *bucket = BUF_GET_BUCKET(set, ctrl->bucket_id);
OG_LOG_DEBUG_WAR(
"[DCS][%u-%u]: fix local lock mode from buf_res after recovery, is_edp:%d, is_dirty:%d, is_remote_dirty:%d, "
" action:%d, load status:%d, fixed:%d, clean lsn:%llu, page lsn:%llu, lock_mode:%d",
ctrl->page_id.file, ctrl->page_id.page, ctrl->is_edp, ctrl->is_dirty, ctrl->is_remote_dirty, action,
ctrl->load_status, ctrl->is_fixed, clean_lsn, ctrl->page->lsn, ctrl->lock_mode);
cm_spin_lock(&bucket->lock, &session->stat->spin_stat.stat_bucket);
knl_panic(ctrl->is_edp);
if (ctrl->is_fixed) {
cm_spin_unlock(&bucket->lock);
return;
}
ctrl->is_fixed = 1;
if (action == DRC_RES_SHARE_ACTION) {
CM_ASSERT(ctrl->lock_mode == DRC_LOCK_SHARE && ctrl->load_status == BUF_IS_LOADED);
ctrl->is_edp = 0;
} else if (action == DRC_RES_CLEAN_EDP_ACTION) {
CM_ASSERT(clean_lsn != OG_INVALID_ID64);
if (clean_lsn >= ctrl->page->lsn) {
dcs_buf_clean_ctrl_edp(session, ctrl, OG_FALSE);
}
} else if (action == DRC_RES_EXCLUSIVE_ACTION) {
buf_ctrl_t *tmp_ctrl =
(buf_ctrl_t *)cm_push(session->stack,
sizeof(buf_ctrl_t) + (uint32)(DEFAULT_PAGE_SIZE(session) + OG_MAX_ALIGN_SIZE_4K));
*tmp_ctrl = *ctrl;
tmp_ctrl->lock_mode = DRC_LOCK_SHARE;
tmp_ctrl->is_edp = 0;
tmp_ctrl->is_dirty = 0;
tmp_ctrl->page = (page_head_t *)cm_aligned_buf((char *)tmp_ctrl + (uint64)sizeof(buf_ctrl_t));
tmp_ctrl->page->lsn = 0;
if (buf_load_page(session, tmp_ctrl, tmp_ctrl->page_id) != OG_SUCCESS) {
tmp_ctrl->load_status = (uint8)BUF_LOAD_FAILED;
knl_panic_log(0, "[DCS]edp page[%u-%u] (lsn:%lld) load from disk failed", ctrl->page_id.file,
ctrl->page_id.page, ctrl->page->lsn);
}
if (ctrl->page->lsn < dtc_get_ctrl_lsn(tmp_ctrl)) {
OG_LOG_RUN_WAR("[DCS]edp page[%u-%u] (lsn:%lld) is older than disk page(%lld), reload from disk",
ctrl->page_id.file, ctrl->page_id.page, ctrl->page->lsn, tmp_ctrl->page->lsn);
errno_t err_s;
err_s = memcpy_s(ctrl->page, DEFAULT_PAGE_SIZE(session), tmp_ctrl->page, DEFAULT_PAGE_SIZE(session));
knl_securec_check(err_s);
dcs_buf_clean_ctrl_edp(session, ctrl, OG_FALSE);
}
cm_pop(session->stack);
ctrl->is_edp = 0;
ctrl->lock_mode = DRC_LOCK_EXCLUSIVE;
ctrl->load_status = BUF_IS_LOADED;
ctrl->force_request = 0;
} else {
CM_ASSERT(0);
}
ctrl->is_fixed = 0;
cm_spin_unlock(&bucket->lock);
}
* owner transfer local page to requester
*/
status_t static inline dcs_owner_transfer_page(knl_session_t *session, uint8 owner_id, msg_page_req_t *page_req)
{
mes_message_head_t req_head;
buf_ctrl_t *ctrl = NULL;
uint8 flag = 0;
uint16 size;
status_t ret;
bool32 skip_check = page_req->is_retry;
owner is not changed. */
uint8 req_id = page_req->head.src_inst;
uint32 req_sid = page_req->head.src_sid;
uint32 req_rsn = page_req->head.rsn;
if (page_req->page_id.file >= INVALID_FILE_ID ||
(page_req->req_mode != DRC_LOCK_EXCLUSIVE && page_req->req_mode != DRC_LOCK_SHARE) ||
(page_req->action != DRC_RES_INVALID_ACTION && page_req->action != DRC_RES_SHARE_ACTION &&
page_req->action != DRC_RES_EXCLUSIVE_ACTION)) {
OG_LOG_RUN_ERR("invalid page_id [%u-%u] or req mode %u or action %d", page_req->page_id.file,
page_req->page_id.page, page_req->req_mode, page_req->action);
mes_send_error_msg(&page_req->head);
return OG_ERROR;
}
if (DRC_STOP_DCS_IO_FOR_REFORMING(page_req->req_version, session, page_req->page_id)) {
OG_LOG_RUN_ERR("[DCS][%u-%u]: reforming, owner transfer page failed, req_version=%llu, cur_version=%llu",
page_req->page_id.file, page_req->page_id.page, page_req->req_version,
DRC_GET_CURR_REFORM_VERSION);
mes_send_error_msg(&page_req->head);
return OG_ERROR;
}
bool32 need_load = OG_FALSE;
ret = dcs_read_local_page4transfer(session, page_req, &ctrl, &need_load);
if (ret == OG_ERROR) {
mes_send_error_msg(&page_req->head);
return ret;
}
if (!ctrl || need_load) {
flag = MES_FLAG_NEED_LOAD;
} else {
if (page_req->action != DRC_RES_INVALID_ACTION && ctrl->is_edp) {
dcs_clean_local_ctrl(session, ctrl, page_req->action, OG_INVALID_ID64);
}
if ((page_req->curr_mode == DRC_LOCK_SHARE) && (page_req->req_mode == DRC_LOCK_EXCLUSIVE)) {
if (ctrl->lock_mode != DRC_LOCK_SHARE && !skip_check) {
OG_LOG_RUN_ERR("[DCS][%u-%u]: owner transfer page failed, invalid lock_mode(%u)",
page_req->page_id.file, page_req->page_id.page, ctrl->lock_mode);
mes_send_error_msg(&page_req->head);
dcs_leave_page(session);
return OG_ERROR;
}
flag = MES_FLAG_READONLY2X;
}
if (page_req->lsn > ctrl->page->lsn) {
OG_LOG_RUN_ERR(
"[DCS][%u-%u]: owner transfer page failed, invalid page_req->lsn(%llu), ctrl->page->lsn(%llu)",
page_req->page_id.file, page_req->page_id.page, page_req->lsn, ctrl->page->lsn);
mes_send_error_msg(&page_req->head);
dcs_leave_page(session);
return OG_ERROR;
}
}
msg_ask_page_ack_t ask_page;
req_head.src_inst = req_id;
req_head.dst_inst = owner_id;
req_head.src_sid = req_sid;
req_head.rsn = req_rsn;
ask_page.req_version = page_req->req_version;
size = sizeof(msg_ask_page_ack_t);
if (flag == 0) {
size += DEFAULT_PAGE_SIZE(session);
}
mes_init_ack_head(&req_head, &ask_page.head, MES_CMD_PAGE_READY, size, DCS_SELF_SID(session));
ask_page.lsn = 0;
ask_page.scn = 0;
ask_page.mode = page_req->req_mode;
ask_page.head.flags = flag;
if (!ctrl) {
SYNC_POINT_GLOBAL_START(OGRAC_DCS_TRANSFER_BEFORE_SEND_ABORT, NULL, 0);
SYNC_POINT_GLOBAL_END;
ret = dcs_send_data_retry((void *)&ask_page);
DTC_DCS_DEBUG(ret, "[DCS][%u-%u][%s]: status=(%d), need load, dest_id=%u, dest_sid=%u, mode=%u",
page_req->page_id.file, page_req->page_id.page, MES_CMD2NAME(ask_page.head.cmd), ret,
ask_page.head.dst_inst, ask_page.head.dst_sid, page_req->req_mode);
SYNC_POINT_GLOBAL_START(OGRAC_DCS_TRANSFER_AFTER_SEND_ABORT, NULL, 0);
SYNC_POINT_GLOBAL_END;
return ret;
}
knl_panic(!ctrl->is_readonly);
if (ctrl->is_dirty || ctrl->is_marked) {
knl_begin_session_wait(session, DCS_TRANSFER_PAGE_FLUSHLOG, OG_TRUE);
if (OGRAC_NEED_FLUSH_LOG(session, ctrl)) {
if (log_flush(session, NULL, NULL, NULL) != OG_SUCCESS) {
CM_ABORT(0, "[DTC DCS][%u-%u]: ABORT INFO: flush redo log failed", page_req->page_id.file,
page_req->page_id.page);
}
}
knl_end_session_wait(session, DCS_TRANSFER_PAGE_FLUSHLOG);
}
ask_page.lsn = DB_CURR_LSN(session);
ask_page.scn = DB_CURR_SCN(session);
if (ctrl->is_dirty) {
ask_page.head.flags |= MES_FLAG_DIRTY_PAGE;
}
if (ctrl->is_remote_dirty) {
ask_page.head.flags |= MES_FLAG_REMOTE_DIRTY_PAGE;
}
if (page_req->req_mode == DRC_LOCK_EXCLUSIVE) {
knl_panic(!ctrl->is_marked && !ctrl->is_readonly);
ask_page.edp_map = ctrl->edp_map;
if (ctrl->is_dirty || ctrl->is_remote_dirty) {
ask_page.edp_map = ask_page.edp_map | (1ULL << session->kernel->id);
}
} else {
knl_panic(flag != MES_FLAG_READONLY2X);
ask_page.edp_map = 0;
}
knl_begin_session_wait(session, DCS_TRANSFER_PAGE, OG_TRUE);
SYNC_POINT_GLOBAL_START(OGRAC_DCS_TRANSFER_BEFORE_SEND_ABORT, NULL, 0);
SYNC_POINT_GLOBAL_END;
if (DRC_STOP_DCS_IO_FOR_REFORMING(page_req->req_version, session, page_req->page_id)) {
OG_LOG_RUN_ERR("[DCS][%u-%u]: reforming, owner transfer page failed, req_version=%llu, cur_version=%llu",
page_req->page_id.file, page_req->page_id.page, page_req->req_version,
DRC_GET_CURR_REFORM_VERSION);
ctrl->transfer_status = BUF_TRANS_NONE;
dcs_leave_page(session);
mes_send_error_msg(&page_req->head);
return OG_ERROR;
}
if (flag != 0) {
ret = dcs_send_data_retry((void *)&ask_page);
} else {
ret = dcs_send_data3_retry(&ask_page.head, sizeof(msg_ask_page_ack_t), (void *)session->curr_page);
if (ret == OG_SUCCESS) {
session->stat->dcs_buffer_sends++;
}
}
if (ret == OG_SUCCESS && DCS_BUF_CTRL_IS_OWNER(session, ctrl)) {
SYNC_POINT_GLOBAL_START(OGRAC_DCS_TRANSFER_AFTER_SEND_ABORT, NULL, 0);
SYNC_POINT_GLOBAL_END;
if (page_req->req_mode == DRC_LOCK_EXCLUSIVE) {
ctrl->lock_mode = DRC_LOCK_NULL;
ctrl->load_status = BUF_LOAD_FAILED;
if (ctrl->is_dirty) {
ctrl->is_edp = OG_TRUE;
ctrl->edp_scn = DB_CURR_SCN(session);
}
ctrl->is_remote_dirty = 0;
} else {
ctrl->lock_mode = DRC_LOCK_SHARE;
}
}
if (ret != OG_SUCCESS && ctrl->transfer_status == BUF_TRANS_REL_OWNER) {
ctrl->transfer_status = BUF_TRANS_NONE;
}
knl_end_session_wait(session, DCS_TRANSFER_PAGE);
DTC_DCS_DEBUG(
ret,
"[DCS][%u-%u][%s]: after owner transfer page, status=(%d), dest_id=%u, dest_sid=%u, mode=%u, ctrl_dirty=%u, remote dirty=%u, ctrl_lock_mode=%u, ctrl_is_edp=%u,"
"page pcn=%d, page_lsn=%llu, sync lsn=%llu, sync scn=%llu, page_type=%u, page req mode=%d, flag=%d, retry=%d, req rsn=%u",
page_req->page_id.file, page_req->page_id.page, MES_CMD2NAME(ask_page.head.cmd), ret, ask_page.head.dst_inst,
ask_page.head.dst_sid, page_req->req_mode, ctrl->is_dirty, ctrl->is_remote_dirty, ctrl->lock_mode, ctrl->is_edp,
ctrl->page->pcn, ctrl->page->lsn, ask_page.lsn, ask_page.scn, ((heap_page_t *)session->curr_page)->head.type,
page_req->req_mode, flag, skip_check, ask_page.head.rsn);
dcs_leave_page(session);
return ret;
}
static inline status_t dcs_notify_owner_for_page_r(knl_session_t *session, uint8 owner_id, msg_page_req_t *page_req)
{
status_t ret;
uint8 req_id = page_req->head.src_inst;
uint32 req_sid = page_req->head.src_sid;
uint32 req_rsn = page_req->head.rsn;
if (owner_id != req_id) {
if (DRC_STOP_DCS_IO_FOR_REFORMING(page_req->req_version, session, page_req->page_id)) {
OG_LOG_RUN_ERR("[DCS][%u-%u]: doing remaster", page_req->page_id.file, page_req->page_id.page);
return OG_ERROR;
}
mes_init_send_head(&page_req->head, MES_CMD_ASK_OWNER, sizeof(msg_page_req_t), req_rsn, req_id, owner_id,
req_sid, OG_INVALID_ID16);
SYNC_POINT_GLOBAL_START(OGRAC_DCS_NOTIFY_OWNER_SEND_FAIL, &ret, OG_ERROR);
ret = dcs_send_data_retry(page_req);
SYNC_POINT_GLOBAL_END;
if (ret == OG_SUCCESS) {
DTC_DCS_DEBUG_INF("[DCS][%u-%u][%s]: dest_id=%u, dest_sid=%u, mode=%u", page_req->page_id.file,
page_req->page_id.page, MES_CMD2NAME(page_req->head.cmd), page_req->head.dst_inst,
page_req->head.dst_sid, page_req->req_mode);
SYNC_POINT_GLOBAL_START(OGRAC_DCS_NOTIFY_OWNER_SUCC_ABORT, NULL, 0);
SYNC_POINT_GLOBAL_END;
return OG_SUCCESS;
}
OG_LOG_RUN_ERR("[DCS][%u-%u][%s]: dcs_notify_owner_for_page_r failed, dest_id=%u, dest_sid=%u, mode=%u",
page_req->page_id.file, page_req->page_id.page, MES_CMD2NAME(page_req->head.cmd),
page_req->head.dst_inst, page_req->head.dst_sid, page_req->req_mode);
return OG_ERROR;
}
msg_ack_owner_t ack;
mes_init_send_head(&ack.head, MES_CMD_MASTER_ACK_ALREADY_OWNER, sizeof(msg_ack_owner_t), req_rsn,
DCS_SELF_INSTID(session), req_id, DCS_SELF_SID(session), req_sid);
ack.req_version = page_req->req_version;
ack.action = page_req->action;
ack.lsn = page_req->lsn;
ack.req_mode = page_req->req_mode;
if (dcs_send_data_retry(&ack) == OG_SUCCESS) {
DTC_DCS_DEBUG_INF("[DCS][%u-%u][%s]: dest_id=%u, dest_sid=%u, mode=%u", page_req->page_id.file,
page_req->page_id.page, MES_CMD2NAME(ack.head.cmd), ack.head.dst_inst, ack.head.dst_sid,
page_req->req_mode);
return OG_SUCCESS;
}
OG_LOG_RUN_ERR("[DCS][%u-%u][%s]: failed, dest_id=%u, dest_sid=%u, mode=%u", page_req->page_id.file,
page_req->page_id.page, MES_CMD2NAME(ack.head.cmd), ack.head.dst_inst, ack.head.dst_sid,
page_req->req_mode);
return OG_ERROR;
}
static status_t dcs_notify_owner_for_page(knl_session_t *session, uint8 owner_id, msg_page_req_t *page_req)
{
if ((DCS_SELF_INSTID(session) == owner_id) && (owner_id != page_req->head.src_inst)) {
status_t ret = dcs_owner_transfer_page(session, owner_id, page_req);
if (SECUREC_UNLIKELY(ret != OG_SUCCESS)) {
OG_LOG_RUN_ERR("[DCS][%u-%u][owner transfer page]: failed, dest_id=%u, dest_sid=%u, dest_rsn=%u, mode=%u",
page_req->page_id.file, page_req->page_id.page, page_req->head.src_inst,
page_req->head.src_sid, page_req->head.rsn, page_req->req_mode);
}
return ret;
}
#ifdef DB_DEBUG_VERSION
if (owner_id == page_req->head.src_inst) {
DTC_DCS_DEBUG_INF("[DCS][%u-%u]notify owner for page, set already owner on same node, owner_id=%d, dest_id=%u, "
"dest_sid=%u, dest_rsn=%u, mode=%u",
page_req->page_id.file, page_req->page_id.page, owner_id, page_req->head.src_inst,
page_req->head.src_sid, page_req->head.rsn, page_req->req_mode);
}
#endif
return dcs_notify_owner_for_page_r(session, owner_id, page_req);
}
static inline void dcs_send_requester_granted(knl_session_t *session, msg_page_req_t *page_req)
{
msg_pg_ack_ld_t ack;
mes_init_ack_head(&page_req->head, &ack.head, MES_CMD_MASTER_ACK_NEED_LOAD, sizeof(msg_pg_ack_ld_t),
DCS_SELF_SID(session));
ack.head.rsn = page_req->head.rsn;
ack.master_lsn = DB_CURR_LSN(session);
ack.scn = DB_CURR_SCN(session);
ack.req_version = page_req->req_version;
if (dcs_send_data_retry(&ack) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DCS]failed to send ack");
return;
}
if (LOG_DEBUG_INF_ON) {
DTC_DCS_DEBUG_INF("[DCS][%u-%u][%s]: dest_id=%u, dest_sid=%u, mode=%u, rsn=%u", page_req->page_id.file,
page_req->page_id.page, MES_CMD2NAME(ack.head.cmd), ack.head.dst_inst, ack.head.dst_sid,
page_req->req_mode, ack.head.rsn);
}
}
static inline void dcs_send_requester_already_owner(knl_session_t *session, msg_page_req_t *page_req)
{
msg_ack_owner_t ack;
mes_init_ack_head(&page_req->head, &ack.head, MES_CMD_MASTER_ACK_ALREADY_OWNER, sizeof(msg_ack_owner_t),
DCS_SELF_SID(session));
ack.req_version = page_req->req_version;
ack.action = page_req->action;
ack.lsn = page_req->lsn;
ack.req_mode = page_req->req_mode;
if (dcs_send_data_retry(&ack) != OG_SUCCESS) {
return;
}
if (LOG_DEBUG_INF_ON) {
DTC_DCS_DEBUG_INF("[DCS][%u-%u][%s]: dest_id=%u, dest_sid=%u, mode=%u, rsn=%u", page_req->page_id.file,
page_req->page_id.page, MES_CMD2NAME(ack.head.cmd), ack.head.dst_inst, ack.head.dst_sid,
page_req->req_mode, ack.head.rsn);
}
}
static inline void dcs_send_error_msg(knl_session_t *session, msg_page_req_t *page_req)
{
mes_message_head_t head;
mes_init_ack_head(&page_req->head, &head, MES_CMD_MASTER_ACK_ALREADY_OWNER, sizeof(mes_message_head_t),
DCS_SELF_SID(session));
if (mes_send_data(&head) != OG_SUCCESS) {
return;
}
if (LOG_DEBUG_INF_ON) {
DTC_DCS_DEBUG_INF("[DCS][%u-%u][%s]: dest_id=%u, dest_sid=%u, mode=%u, rsn=%u", page_req->page_id.file,
page_req->page_id.page, MES_CMD2NAME(head.cmd), head.dst_inst, head.dst_sid,
page_req->req_mode, head.rsn);
}
}
void dcs_process_ask_master_for_page(void *sess, mes_message_t *receive_msg)
{
drc_req_owner_result_t result;
knl_session_t *session = (knl_session_t *)sess;
if (sizeof(msg_page_req_t) != receive_msg->head->size) {
OG_LOG_RUN_ERR("process ask master for page msg is invalid, msg size %u.", receive_msg->head->size);
mes_release_message_buf(receive_msg->buffer);
return;
}
msg_page_req_t page_req = *(msg_page_req_t *)(receive_msg->buffer);
mes_release_message_buf(receive_msg->buffer);
if (page_req.req_mode >= DRC_LOCK_MODE_MAX || page_req.curr_mode >= DRC_LOCK_MODE_MAX) {
OG_LOG_RUN_ERR("[DCS][%u-%u][ask master for page_req]req mode invalid, cur_mode %d, req_mode %d",
page_req.page_id.file, page_req.page_id.page, page_req.curr_mode, page_req.req_mode);
return;
}
page_id_t page_id = page_req.page_id;
if (DRC_STOP_DCS_IO_FOR_REFORMING(page_req.req_version, session, page_id)) {
OG_LOG_RUN_ERR("[DCS][%u-%u]reforming, ask master failed, req_version=%llu, cur_version=%llu", page_id.file,
page_id.page, page_req.req_version, DRC_GET_CURR_REFORM_VERSION);
mes_send_error_msg(&page_req.head);
return;
}
DTC_DCS_DEBUG_INF(
"[DCS][%u-%u][%s]: ask master for page, src_id=%u, src_sid=%u, req_mode=%u, curr_mode=%u, rsn=%u, lsn=%llu",
page_req.page_id.file, page_req.page_id.page, MES_CMD2NAME(page_req.head.cmd), page_req.head.src_inst,
page_req.head.src_sid, page_req.req_mode, page_req.curr_mode, page_req.head.rsn, page_req.lsn);
drc_req_info_t req_info;
req_info.inst_id = page_req.head.src_inst;
req_info.inst_sid = page_req.head.src_sid;
req_info.rsn = page_req.head.rsn;
req_info.req_mode = page_req.req_mode;
req_info.curr_mode = page_req.curr_mode;
req_info.req_time = page_req.head.req_start_time;
req_info.req_version = page_req.req_version;
req_info.lsn = page_req.lsn;
SYNC_POINT_GLOBAL_START(OGRAC_DCS_PROC_ASK_MASTER_ABORT, NULL, 0);
SYNC_POINT_GLOBAL_END;
status_t ret = drc_request_page_owner(session, page_req.page_id, &req_info, OG_FALSE, &result);
if (SECUREC_UNLIKELY(ret != OG_SUCCESS)) {
mes_send_error_msg(&page_req.head);
DTC_DCS_DEBUG_ERR(
"[DCS][%u-%u][%s]: failed to request page_req owner, src_id=%u, src_sid=%u, req_mode=%u, curr_mode=%u, "
"req_rsn=%u",
page_req.page_id.file, page_req.page_id.page, MES_CMD2NAME(page_req.head.cmd), page_req.head.src_inst,
page_req.head.src_sid, page_req.req_mode, page_req.curr_mode, page_req.head.rsn);
return;
}
DTC_DRC_DEBUG_INF(
"[DRC][%u-%u][ask master for page, after request]: req_id=%u, req_sid=%u, req_rsn=%u, "
"req_mode=%u, curr_mode=%u, req_version=%llu, cur_version=%llu, result=%d, action=%d, curr owner:%d",
page_id.file, page_id.page, req_info.inst_id, req_info.inst_sid, req_info.rsn, req_info.req_mode,
req_info.curr_mode, req_info.req_version, DRC_GET_CURR_REFORM_VERSION, result.type, result.action,
result.curr_owner_id);
page_req.action = result.action;
page_req.is_retry = result.is_retry;
page_req.req_mode = result.req_mode;
switch (result.type) {
case DRC_REQ_OWNER_GRANTED:
dcs_send_requester_granted(session, &page_req);
break;
case DRC_REQ_OWNER_ALREADY_OWNER:
dcs_send_requester_already_owner(session, &page_req);
break;
case DRC_REQ_OWNER_WAITING:
DTC_DCS_DEBUG_INF("[DCS][%u-%u][waiting for converting]: dest_id=%u, dest_sid=%u, req_mode=%u, curr_mode=%u",
page_req.page_id.file, page_req.page_id.page, page_req.head.src_inst,
page_req.head.src_sid, page_req.req_mode, page_req.curr_mode);
break;
case DRC_REQ_OWNER_CONVERTING:
(void)dcs_notify_owner_for_page(session, result.curr_owner_id, &page_req);
break;
default:
OG_LOG_RUN_ERR("[DCS][%u-%u] unexpected owner request result, type=%u", page_req.page_id.file,
page_req.page_id.page, result.type);
break;
}
}
static status_t dcs_try_get_page_exclusive_owner_l(knl_session_t *session, drc_req_info_t *req_info,
page_id_t *page_ids, uint32 count, drc_req_owner_result_t *result);
void dcs_process_try_ask_master_for_page(void *sess, mes_message_t *receive_msg)
{
msg_owner_req_t owner_ack;
msg_page_req_batch_t *page_req = (msg_page_req_batch_t *)(receive_msg->buffer);
knl_session_t *session = (knl_session_t *)sess;
if (sizeof(msg_page_req_batch_t) != receive_msg->head->size || page_req->count > BUF_MAX_PREFETCH_NUM) {
OG_LOG_RUN_ERR("msg is invalid, msg size %u, count:%u", receive_msg->head->size, page_req->count);
mes_release_message_buf(receive_msg->buffer);
return;
}
drc_req_info_t req_info;
req_info.inst_id = page_req->head.src_inst;
req_info.inst_sid = page_req->head.src_sid;
req_info.req_mode = DRC_LOCK_EXCLUSIVE;
req_info.curr_mode = DRC_LOCK_NULL;
req_info.rsn = page_req->head.rsn;
req_info.req_time = page_req->head.req_start_time;
req_info.req_version = page_req->req_version;
req_info.lsn = 0;
for (uint32 i = 0; i < page_req->count; i++) {
owner_ack.result[i].type = DRC_REQ_OWNER_INVALID;
}
(void)dcs_try_get_page_exclusive_owner_l(session, &req_info, page_req->page_ids, page_req->count, owner_ack.result);
owner_ack.count = page_req->count;
#ifdef DB_DEBUG_VERSION
char msg[SIZE_K(2)] = { 0 };
uint16 msg_len = SIZE_K(2);
uint32 pos = 0;
int iret_snprintf;
for (uint32 i = 0; i < owner_ack.count; i++) {
if (!IS_INVALID_PAGID(page_req->page_ids[i])) {
iret_snprintf = snprintf_s(msg + pos, msg_len - pos, msg_len - pos - 1, "%u-%u:%d",
page_req->page_ids[i].file, page_req->page_ids[i].page,
owner_ack.result[i].type);
if (SECUREC_UNLIKELY(iret_snprintf == -1)) {
knl_panic_log(0, "Secure C lib has thrown an error %d", iret_snprintf);
}
pos += iret_snprintf;
}
}
DTC_DCS_DEBUG_INF("[DCS][after try to ask master for page_req]: dest_id=%u, dest_sid=%u, %s",
page_req->head.src_inst, page_req->head.src_sid, msg);
#endif
mes_release_message_buf(receive_msg->buffer);
mes_init_ack_head(&page_req->head, &owner_ack.head, MES_CMD_TRY_ASK_MASTER_ACK, sizeof(msg_owner_req_t),
OG_INVALID_ID16);
if (dcs_send_data_retry(&owner_ack) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DCS]failed to send ack");
return;
}
}
static status_t dcs_ask_owner_for_page(knl_session_t *session, buf_ctrl_t *ctrl, drc_req_owner_result_t *result,
drc_lock_mode_e req_mode, uint64 req_version)
{
status_t ret;
page_id_t page_id = ctrl->page_id;
msg_page_req_t page_req;
mes_init_send_head(&page_req.head, MES_CMD_ASK_OWNER, sizeof(msg_page_req_t), OG_INVALID_ID32,
DCS_SELF_INSTID(session), result->curr_owner_id, DCS_SELF_SID(session), OG_INVALID_ID16);
page_req.page_id = page_id;
page_req.req_mode = req_mode;
page_req.curr_mode = ctrl->lock_mode;
page_req.req_version = req_version;
page_req.action = result->action;
page_req.lsn = dtc_get_ctrl_lsn(ctrl);
page_req.is_retry = result->is_retry;
if (DRC_STOP_DCS_IO_FOR_REFORMING(page_req.req_version, session, page_req.page_id)) {
OG_LOG_RUN_ERR("[DCS][%u-%u]: reforming, send ask owner failed, req_version=%llu, cur_version=%llu",
page_req.page_id.file, page_req.page_id.page, req_version, DRC_GET_CURR_REFORM_VERSION);
return OG_ERROR;
}
SYNC_POINT_GLOBAL_START(OGRAC_DCS_NOTIFY_OWNER_SEND_FAIL, &ret, OG_ERROR);
ret = dcs_send_data_retry(&page_req);
SYNC_POINT_GLOBAL_END;
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DCS][%u-%u][%s]: send msg failed, src_id=%u, src_sid=%u, dest_id=%u, dest_sid=%u, req_mode=%u",
page_id.file, page_id.page, MES_CMD2NAME(page_req.head.cmd), page_req.head.src_inst,
page_req.head.src_sid, page_req.head.dst_inst, page_req.head.dst_sid, req_mode);
return OG_ERROR;
}
DTC_DCS_DEBUG_INF("[DCS][%u-%u][%s]: src_id=%u, src_sid=%u, dest_id=%u, dest_sid=%u, req_mode=%u", page_id.file,
page_id.page, MES_CMD2NAME(page_req.head.cmd), page_req.head.src_inst, page_req.head.src_sid,
page_req.head.dst_inst, page_req.head.dst_sid, req_mode);
SYNC_POINT_GLOBAL_START(OGRAC_DCS_NOTIFY_OWNER_SUCC_ABORT, NULL, 0);
SYNC_POINT_GLOBAL_END;
mes_message_t msg;
if (mes_recv(session->id, &msg, OG_FALSE, page_req.head.rsn, DCS_WAIT_MSG_TIMEOUT) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DCS][%u-%u][%s]: ack time out, src_id=%u, src_sid=%u, dest_id=%u, dest_sid=%u, req_mode=%u",
page_id.file, page_id.page, MES_CMD2NAME(page_req.head.cmd), page_req.head.src_inst,
page_req.head.src_sid, page_req.head.dst_inst, page_req.head.dst_sid, req_mode);
return OG_ERROR;
}
if (msg.head->cmd == MES_CMD_ERROR_MSG) {
DTC_DCS_DEBUG_ERR("[DCS][%u-%u][%s]: ack err msg, src_id=%u, src_sid=%u, dest_id=%u, dest_sid=%u, req_mode=%u",
page_id.file, page_id.page, MES_CMD2NAME(page_req.head.cmd), page_req.head.src_inst,
page_req.head.src_sid, page_req.head.dst_inst, page_req.head.dst_sid, req_mode);
mes_release_message_buf(msg.buffer);
return OG_ERROR;
}
SYNC_POINT_GLOBAL_START(OGRAC_DCS_ASK_MASTER_ACK_SUCC_ABORT, NULL, 0);
SYNC_POINT_GLOBAL_END;
ret = dcs_handle_ack_page_ready(session, DCS_SELF_INSTID(session), &msg, ctrl, req_mode);
mes_release_message_buf(msg.buffer);
return ret;
}
void dcs_process_ask_owner_for_page(void *sess, mes_message_t *receive_msg)
{
knl_session_t *session = (knl_session_t *)sess;
if (sizeof(msg_page_req_t) != receive_msg->head->size) {
OG_LOG_RUN_ERR("process ask owner for page msg is invalid, msg size %u.", receive_msg->head->size);
mes_release_message_buf(receive_msg->buffer);
return;
}
msg_page_req_t page_req = *(msg_page_req_t *)(receive_msg->buffer);
mes_release_message_buf(receive_msg->buffer);
status_t ret = dcs_owner_transfer_page(session, DCS_SELF_INSTID(session), &page_req);
if (SECUREC_UNLIKELY(ret != OG_SUCCESS)) {
OG_LOG_RUN_ERR(
"[DCS][%u-%u][process ask owner] failed, owner_id=%u, req_id=%u, req_sid=%u, req_rsn=%u, mode=%u, lsn=%llu",
page_req.page_id.file, page_req.page_id.page, DCS_SELF_INSTID(session), page_req.head.src_inst,
page_req.head.src_sid, page_req.head.rsn, page_req.req_mode, page_req.lsn);
}
}
status_t inline dcs_try_notify_owner_for_page(knl_session_t *session, cvt_info_t *cvt_info)
{
if (!DCS_INSTID_VALID(cvt_info->req_id)) {
return OG_SUCCESS;
}
msg_page_req_t page_req;
mes_init_send_head(&page_req.head, MES_CMD_ASK_OWNER, sizeof(msg_page_req_t), cvt_info->req_rsn, cvt_info->req_id,
cvt_info->owner_id, cvt_info->req_sid, OG_INVALID_ID16);
page_req.page_id = cvt_info->pageid;
page_req.req_mode = cvt_info->req_mode;
page_req.curr_mode = cvt_info->curr_mode;
page_req.req_version = cvt_info->req_version;
page_req.lsn = cvt_info->lsn;
page_req.action = DRC_RES_INVALID_ACTION;
page_req.is_retry = OG_FALSE;
status_t ret = dcs_notify_owner_for_page(session, cvt_info->owner_id, &page_req);
if (SECUREC_UNLIKELY(ret != OG_SUCCESS)) {
OG_LOG_RUN_ERR("[DCS][%u-%u][notify owner transfer page]: failed, owner_id=%u, req_id=%u, "
"req_sid=%u, req_rsn=%u, req_mode=%u, curr_mode=%u, copy_insts=%llu",
page_req.page_id.file, page_req.page_id.page, cvt_info->owner_id, cvt_info->req_id,
cvt_info->req_sid, cvt_info->req_rsn, cvt_info->req_mode, cvt_info->curr_mode,
cvt_info->readonly_copies);
}
return ret;
}
void dcs_process_claim_ownership_req(void *sess, mes_message_t *receive_msg)
{
if (sizeof(msg_claim_owner_t) != receive_msg->head->size) {
OG_LOG_RUN_ERR("process claim ownership msg is invalid, msg size %u.", receive_msg->head->size);
mes_release_message_buf(receive_msg->buffer);
return;
}
msg_claim_owner_t *request = (msg_claim_owner_t *)(receive_msg->buffer);
knl_session_t *session = (knl_session_t *)sess;
uint64 req_version = request->req_version;
cvt_info_t cvt_info;
DTC_DCS_DEBUG_INF("[DCS][%u-%u][%s]: src_id=%u, src_sid=%u, dest_id=%u, dest_sid=%u, has_edp=%u, req mode=%d",
request->page_id.file, request->page_id.page, MES_CMD2NAME(request->head.cmd),
request->head.src_inst, request->head.src_sid, request->head.dst_inst, request->head.dst_sid,
request->has_edp, request->mode);
claim_info_t claim_info;
DRC_SET_CLAIM_INFO(&claim_info, request->head.src_inst, request->head.src_sid, request->page_id, request->has_edp,
request->mode, request->lsn);
if (DRC_STOP_DCS_IO_FOR_REFORMING(req_version, session, request->page_id)) {
OG_LOG_RUN_ERR("[DCS][%u-%u]: reforming, claim owner failed, req_version=%llu, cur_version=%llu",
request->page_id.file, request->page_id.page, req_version, DRC_GET_CURR_REFORM_VERSION);
mes_release_message_buf(receive_msg->buffer);
return;
}
drc_claim_page_owner(session, &claim_info, &cvt_info, req_version);
mes_release_message_buf(receive_msg->buffer);
dcs_try_notify_owner_for_page(session, &cvt_info);
}
void dcs_process_claim_ownership_req_batch(void *sess, mes_message_t *receive_msg)
{
if (sizeof(msg_claim_owner_batch_t) != receive_msg->head->size) {
OG_LOG_RUN_ERR("process claim ownership batch msg is invalid, msg size %u.", receive_msg->head->size);
mes_release_message_buf(receive_msg->buffer);
return;
}
msg_claim_owner_batch_t *request = (msg_claim_owner_batch_t *)(receive_msg->buffer);
knl_session_t *session = (knl_session_t *)sess;
uint64 req_version = request->req_version;
if (request->count > BUF_MAX_PREFETCH_NUM) {
OG_LOG_RUN_ERR("[DCS] invalid count %u", request->count);
mes_release_message_buf(receive_msg->buffer);
return;
}
DTC_DCS_DEBUG_INF("[DCS][%s]: process batch claim, src_id=%u, src_sid=%u, dest_id=%u, dest_sid=%u, total count=%d",
MES_CMD2NAME(request->head.cmd), request->head.src_inst, request->head.src_sid,
request->head.dst_inst, request->head.dst_sid, request->count);
claim_info_t claim_info;
for (uint32 i = 0; i < request->count; i++) {
if (IS_INVALID_PAGID(request->page_ids[i])) {
continue;
}
DRC_SET_CLAIM_INFO(&claim_info, request->head.src_inst, request->head.src_sid, request->page_ids[i], OG_FALSE,
DRC_LOCK_EXCLUSIVE, 0);
dcs_claim_ownership_internal(session, &claim_info, req_version);
}
mes_release_message_buf(receive_msg->buffer);
}
requester and master are on the same instance, req_id equals to master_id/self_id
*/
static status_t dcs_ask_master4page_l(knl_session_t *session, buf_ctrl_t *ctrl, drc_lock_mode_e req_mode)
{
uint64 req_version = DRC_GET_CURR_REFORM_VERSION;
page_id_t page_id = ctrl->page_id;
uint8 req_id = DCS_SELF_INSTID(session);
drc_req_owner_result_t result;
uint32 req_rsn = mes_get_rsn(session->id);
drc_req_info_t req_info;
req_info.inst_id = req_id;
req_info.inst_sid = session->id;
req_info.req_mode = req_mode;
req_info.curr_mode = ctrl->lock_mode;
req_info.rsn = req_rsn;
req_info.req_time = KNL_NOW(session);
req_info.req_version = req_version;
req_info.lsn = dtc_get_ctrl_lsn(ctrl);
DTC_DCS_DEBUG_INF("[DCS][%u-%u][ask master local]: src_id=%u, dest_id=%u, req_mode=%u, ctrl_lock_mode=%u "
"req_version=%llu, req_rsn=%u, pcn=%d, lsn=%llu",
page_id.file, page_id.page, DCS_SELF_INSTID(session), DCS_SELF_INSTID(session), req_mode,
ctrl->lock_mode, req_version, req_rsn, ctrl->page->pcn, ctrl->page->lsn);
knl_begin_session_wait(session, DCS_REQ_MASTER4PAGE_1WAY, OG_TRUE);
status_t ret = drc_request_page_owner(session, page_id, &req_info, OG_FALSE, &result);
if (SECUREC_UNLIKELY(ret != OG_SUCCESS)) {
knl_end_session_wait(session, DCS_REQ_MASTER4PAGE_1WAY);
DTC_DCS_DEBUG_ERR("[DCS]failed to get page owner id: file=%u, page=%u, master id=%u", page_id.file,
page_id.page, req_id);
return OG_ERROR;
}
DTC_DRC_DEBUG_INF(
"[DRC][%u-%u][ask master local, after request]: req_id=%u, req_sid=%u, req_rsn=%u, "
"req_mode=%u, curr_mode=%u, req_version=%llu, cur_version=%llu, result=%d, action=%d, curr owner:%d",
page_id.file, page_id.page, req_info.inst_id, req_info.inst_sid, req_info.rsn, req_info.req_mode,
req_info.curr_mode, req_info.req_version, DRC_GET_CURR_REFORM_VERSION, result.type, result.action,
result.curr_owner_id);
knl_panic(result.req_mode >= req_mode);
switch (result.type) {
case DRC_REQ_OWNER_GRANTED: {
knl_panic(result.action == DRC_RES_INVALID_ACTION);
dcs_set_ctrl4granted(session, ctrl);
knl_end_session_wait(session, DCS_REQ_MASTER4PAGE_1WAY);
DTC_DCS_DEBUG_INF("[DCS][%u-%u][ask master local]: granted, src_id=%u, dest_id=%u, "
"req_mode=%u, ctrl_lock_mode=%u",
page_id.file, page_id.page, DCS_SELF_INSTID(session), DCS_SELF_INSTID(session),
result.req_mode, ctrl->lock_mode);
return OG_SUCCESS;
}
case DRC_REQ_OWNER_ALREADY_OWNER: {
dcs_set_ctrl4already_owner(session, ctrl, result.req_mode, result.action);
(void)dcs_claim_ownership_l(session, page_id, result.req_mode, OG_FALSE, dtc_get_ctrl_latest_lsn(ctrl),
req_version);
knl_end_session_wait(session, DCS_REQ_MASTER4PAGE_1WAY);
return OG_SUCCESS;
}
case DRC_REQ_OWNER_CONVERTING: {
ret = dcs_ask_owner_for_page(session, ctrl, &result, result.req_mode, req_version);
knl_end_session_wait_ex(session, DCS_REQ_MASTER4PAGE_1WAY, DCS_REQ_OWNER4PAGE);
return ret;
}
case DRC_REQ_OWNER_WAITING: {
ret = dcs_handle_ask_master_ack(session, DCS_SELF_INSTID(session), ctrl, result.req_mode, NULL);
knl_end_session_wait_ex(session, DCS_REQ_MASTER4PAGE_1WAY, DCS_REQ_OWNER4PAGE);
return ret;
}
default: {
knl_end_session_wait(session, DCS_REQ_MASTER4PAGE_1WAY);
knl_panic_log(0, "unexpected owner request result, type=%u", result.type);
return OG_ERROR;
}
}
}
static status_t dcs_request_page_internal(knl_session_t *session, buf_ctrl_t *ctrl, page_id_t page_id,
drc_lock_mode_e req_mode)
{
uint8 master_id = OG_INVALID_ID8;
(void)drc_get_page_master_id(page_id, &master_id);
status_t ret;
if (master_id == DCS_SELF_INSTID(session)) {
ret = dcs_ask_master4page_l(session, ctrl, req_mode);
} else {
ret = dcs_ask_master4page_r(session, ctrl, master_id, req_mode);
}
return ret;
}
status_t dcs_request_page(knl_session_t *session, buf_ctrl_t *ctrl, page_id_t page_id, drc_lock_mode_e mode)
{
DTC_DCS_DEBUG_INF("[DCS][%u-%u][dcs request page]: enter", page_id.file, page_id.page);
for (;;) {
status_t ret = OG_SUCCESS;
SYNC_POINT_GLOBAL_START(OGRAC_DCS_REQUEST_PAGE_INTERNAL_FAIL, &ret, OG_ERROR);
ret = dcs_request_page_internal(session, ctrl, page_id, mode);
SYNC_POINT_GLOBAL_END;
if (ret == OG_SUCCESS) {
DTC_DCS_DEBUG_INF("[DCS][%u-%u][dcs request page]: leave, load_status=%u", page_id.file, page_id.page,
ctrl->load_status);
return OG_SUCCESS;
}
if (!dtc_dcs_readable(session, page_id)) {
DTC_DCS_DEBUG_INF("[DCS][%u-%u] dcs not readable, stop trying to read page.", page_id.file, page_id.page);
return ret;
}
if (cm_get_error_code() != ERR_DCS_MSG_EAGAIN) {
return ret;
}
cm_reset_error();
cm_sleep(DCS_RESEND_MSG_INTERVAL);
}
session->stat->dcs_net_time += session->stat->wait_time[DCS_REQ_MASTER4PAGE_1WAY] +
session->stat->wait_time[DCS_REQ_MASTER4PAGE_1WAY] +
session->stat->wait_time[DCS_REQ_MASTER4PAGE_3WAY];
}
static status_t dcs_try_get_page_exclusive_owner_l(knl_session_t *session, drc_req_info_t *req_info,
page_id_t *page_ids, uint32 count, drc_req_owner_result_t *result)
{
for (uint32 i = 0; i < count; i++) {
if (IS_INVALID_PAGID(page_ids[i])) {
continue;
}
(void)drc_request_page_owner(session, page_ids[i], req_info, OG_TRUE, &result[i]);
DTC_DRC_DEBUG_INF(
"[DRC][%u-%u][try get page owner, after request]: req_id=%u, req_sid=%u, req_rsn=%u, "
"req_mode=%u, curr_mode=%u, req_version=%llu, cur_version=%llu, result=%d, action=%d, curr owner:%d",
page_ids[i].file, page_ids[i].page, req_info->inst_id, req_info->inst_sid, req_info->rsn,
req_info->req_mode, req_info->curr_mode, req_info->req_version, DRC_GET_CURR_REFORM_VERSION, result[i].type,
result[i].action, result[i].curr_owner_id);
}
return OG_SUCCESS;
}
status_t dcs_try_get_page_exclusive_owner(knl_session_t *session, buf_ctrl_t **ctrl_array, page_id_t *page_ids,
uint32 count, uint8 master_id, uint32 *valid_count)
{
drc_req_owner_result_t *result = (drc_req_owner_result_t *)cm_push(session->stack,
sizeof(drc_req_owner_result_t) * count);
if (NULL == result) {
OG_LOG_RUN_ERR("[DCS] req owner result failed to malloc memory");
return OG_ERROR;
}
status_t ret;
for (uint32 i = 0; i < count; i++) {
result[i].type = DRC_REQ_OWNER_INVALID;
}
#ifdef DB_DEBUG_VERSION
char msg[SIZE_K(2)] = { 0 };
uint16 msg_len = SIZE_K(2);
uint32 pos = 0;
int iret_snprintf;
for (uint32 i = 0; i < count; i++) {
if (!IS_INVALID_PAGID(page_ids[i])) {
iret_snprintf = snprintf_s(msg + pos, msg_len - pos, msg_len - pos - 1, "%u-%u, ", page_ids[i].file,
page_ids[i].page);
if (SECUREC_UNLIKELY(iret_snprintf == -1)) {
knl_panic_log(0, "Secure C lib has thrown an error %d", iret_snprintf);
}
pos += iret_snprintf;
}
}
DTC_DCS_DEBUG_INF("[DCS][try get share owner for pages]: %s", msg);
#endif
if (master_id == DCS_SELF_INSTID(session)) {
drc_req_info_t req_info;
req_info.inst_id = DCS_SELF_INSTID(session);
req_info.inst_sid = session->id;
req_info.req_mode = DRC_LOCK_EXCLUSIVE;
req_info.curr_mode = DRC_LOCK_NULL;
req_info.rsn = OG_INVALID_ID32;
req_info.req_time = KNL_NOW(session);
req_info.req_version = DRC_GET_CURR_REFORM_VERSION;
req_info.lsn = 0;
ret = dcs_try_get_page_exclusive_owner_l(session, &req_info, page_ids, count, result);
} else {
knl_panic(master_id != OG_INVALID_ID8);
ret = dcs_try_get_page_exclusive_owner_r(session, page_ids, count, master_id, result);
}
#ifdef DB_DEBUG_VERSION
pos = 0;
for (uint32 i = 0; i < count; i++) {
if (!IS_INVALID_PAGID(page_ids[i])) {
iret_snprintf = snprintf_s(msg + pos, msg_len - pos, msg_len - pos - 1, "%u-%u:%d, ", page_ids[i].file,
page_ids[i].page, result[i].type);
if (SECUREC_UNLIKELY(iret_snprintf == -1)) {
knl_panic_log(0, "Secure C lib has thrown an error %d", iret_snprintf);
}
pos += iret_snprintf;
}
}
DTC_DCS_DEBUG_INF("[DCS][after try get share owner for pages]: %s", msg);
#endif
*valid_count = 0;
for (uint32 i = 0; i < count; i++) {
switch (result[i].type) {
case DRC_REQ_OWNER_GRANTED: {
knl_panic(result[i].action == DRC_RES_INVALID_ACTION);
dcs_set_ctrl4granted(session, ctrl_array[i]);
page_ids[i] = INVALID_PAGID;
break;
}
case DRC_REQ_OWNER_ALREADY_OWNER: {
dcs_set_ctrl4already_owner(session, ctrl_array[i], result[i].req_mode, result[i].action);
(*valid_count)++;
break;
}
default: {
knl_panic(result[i].type != DRC_REQ_OWNER_CONVERTING);
DTC_DRC_DEBUG_INF("[DCS][%u-%u][dcs try get page owner] failed, master_id:%u, session_inst_id:%u",
page_ids[i].file, page_ids[i].page, master_id, session->kernel->id);
page_ids[i] = INVALID_PAGID;
}
}
}
both of below 3 cases, we can't do batch load, flag BUF_NEED_TRANSFER will set ctrl to NULL
1) current node is already owner, but since this request is not in queue, we can't call
dcs_set_ctrl4already_owner to change local lock_mode since other node may be claiming the ownership and change
local lock_mode at the same time. 2) page owner is on other instance. 3) get ownership failed
*/
cm_pop(session->stack);
return ret;
}
static status_t dcs_claim_page_exclusive_owners_r(knl_session_t *session, page_id_t *page_ids, uint32 count,
uint8 master_id)
{
status_t ret;
uint64 req_version = DRC_GET_CURR_REFORM_VERSION;
msg_claim_owner_batch_t request;
mes_init_send_head(&request.head, MES_CMD_CLAIM_OWNER_REQ_BATCH, sizeof(msg_claim_owner_batch_t), OG_INVALID_ID32,
session->kernel->dtc_attr.inst_id, master_id, session->id, OG_INVALID_ID16);
request.count = count;
errno_t err_s;
err_s = memcpy_s(request.page_ids, count * sizeof(page_id_t), page_ids, count * sizeof(page_id_t));
knl_securec_check(err_s);
request.req_version = req_version;
SYNC_POINT_GLOBAL_START(OGRAC_DCS_CLAIM_OWNER_SEND_FAIL, &ret, OG_ERROR);
ret = dcs_send_data_retry(&request);
SYNC_POINT_GLOBAL_END;
DTC_DCS_DEBUG(ret, "[DCS]: after send batch claim, src_id=%u, dest_id=%u, count=%d", request.head.src_inst,
request.head.dst_inst, count);
return ret;
}
status_t dcs_claim_page_exclusive_owners(knl_session_t *session, page_id_t *page_ids, uint32 count, uint8 master_id)
{
status_t ret = OG_SUCCESS;
uint64 req_version = DRC_GET_CURR_REFORM_VERSION;
#ifdef DB_DEBUG_VERSION
char msg[SIZE_K(2)] = { 0 };
uint16 msg_len = SIZE_K(2);
uint32 pos = 0;
int iret_snprintf;
for (uint32 i = 0; i < count; i++) {
if (!IS_INVALID_PAGID(page_ids[i])) {
iret_snprintf = snprintf_s(msg + pos, msg_len - pos, msg_len - pos - 1, "%u-%u, ", page_ids[i].file,
page_ids[i].page);
if (SECUREC_UNLIKELY(iret_snprintf == -1)) {
knl_panic_log(0, "Secure C lib has thrown an error %d", iret_snprintf);
}
pos += iret_snprintf;
}
}
DTC_DCS_DEBUG_INF("[DCS][try to claim exclusive page owner for pages]: %s", msg);
#endif
if (master_id == DCS_SELF_INSTID(session)) {
for (uint32 i = 0; i < count; i++) {
if (IS_INVALID_PAGID(page_ids[i])) {
continue;
}
(void)dcs_claim_ownership_l(session, page_ids[i], DRC_LOCK_EXCLUSIVE, OG_FALSE, 0, req_version);
}
} else {
knl_panic(master_id != OG_INVALID_ID8);
ret = dcs_claim_page_exclusive_owners_r(session, page_ids, count, master_id);
}
return ret;
}
void dcs_process_recycle_owner(void *sess, mes_message_t *receive_msg)
{
if (sizeof(msg_recycle_owner_req_t) != receive_msg->head->size) {
OG_LOG_RUN_ERR("process recycle owner msg is invalid, msg size %u.", receive_msg->head->size);
mes_release_message_buf(receive_msg->buffer);
return;
}
msg_recycle_owner_req_t *req = (msg_recycle_owner_req_t *)(receive_msg->buffer);
knl_session_t *session = (knl_session_t *)sess;
if (req->head.src_inst != DCS_SELF_INSTID(session)) {
dtc_update_lsn(session, req->owner_lsn);
dtc_update_scn(session, req->owner_scn);
}
for (uint32 i = 0; i < RECYCLE_PAGE_NUM; i++) {
DTC_DCS_DEBUG_INF(
"[DCS][%u-%u][%s]: process recycle owner, owner_lsn=%llu, owner_scn=%llu, src_id=%d, req_time=%lld, num=%u",
req->pageids[i].file, req->pageids[i].page, MES_CMD2NAME(req->head.cmd), req->owner_lsn, req->owner_scn,
req->head.src_inst, req->req_start_times[i], i);
drc_buf_res_recycle(session, req->head.src_inst, req->req_start_times[i], req->pageids[i], req->req_version);
}
mes_release_message_buf(receive_msg->buffer);
}
void dcs_clean_edp_pages_local(knl_session_t *session, edp_page_info_t *pages, uint32 page_count)
{
ckpt_context_t *ogx = &session->kernel->ckpt_ctx;
uint32 i = 0;
uint32 times = 0;
if (page_count == 0) {
return;
}
knl_panic(!OGRAC_CKPT_SESSION(session));
ckpt_clean_edp_group_t *group = &ogx->local_edp_clean_group;
OG_LOG_DEBUG_INF("[CKPT] local prepare to clean (%d) edp flag", page_count);
ckpt_sort_page_id_array(pages, page_count);
cm_spin_lock(&group->lock, NULL);
while (i < page_count && !CKPT_CLOSED(session)) {
i = ckpt_merge_to_array(pages, i, page_count - i, group->pages, &group->count, OG_CLEAN_EDP_GROUP_SIZE);
if (i == page_count) {
break;
}
cm_spin_unlock(&group->lock);
ckpt_trigger(session, OG_FALSE, CKPT_TRIGGER_INC);
if (times++ > CKPT_TRY_ADD_TO_GROUP_TIMES * 2 || !ogx->ckpt_enabled) {
OG_LOG_DEBUG_WAR("[CKPT] local edp clean group is full when local prepare to clean (%d) edp flag"
"or ckpt is disabled %d",
page_count, ogx->ckpt_enabled);
return;
}
cm_sleep(300);
cm_spin_lock(&group->lock, NULL);
continue;
}
cm_spin_unlock(&group->lock);
ckpt_trigger(session, OG_FALSE, CKPT_TRIGGER_INC);
}
status_t dcs_master_clean_edp(knl_session_t *session, edp_page_info_t *pages, uint32 start, uint32 end, uint32 length)
{
OG_LOG_DEBUG_INF("[CKPT][master process request to clean edp flag]: src_id=%u, count=%d", DCS_SELF_INSTID(session),
end - start);
if (start >= end) {
return OG_SUCCESS;
}
if (end > length) {
OG_LOG_RUN_ERR("[CKPT] invalid idx end %u", end);
return OG_ERROR;
}
knl_panic(end - start <= OG_CKPT_EDP_GROUP_SIZE(session));
msg_ckpt_edp_request_t *msg = (msg_ckpt_edp_request_t *)cm_push(session->stack, OG_MSG_EDP_REQ_SIZE(session));
if (msg == NULL) {
OG_LOG_RUN_ERR("msg failed to malloc memory");
return OG_ERROR;
}
status_t status;
int32 idx_start = start;
int32 idx_end = end;
page_id_t page_id;
drc_edp_info_t edp_info;
status_t ret;
cluster_view_t view;
for (uint32 i = 0; i < g_dtc->profile.node_count; i++) {
rc_get_cluster_view(&view, OG_FALSE);
if (!rc_bitmap64_exist(&view.bitmap, i)) {
OG_LOG_RUN_INF("[CKPT] inst id (%u) is not alive, alive bitmap: %llu", i, view.bitmap);
continue;
}
if (i == DCS_SELF_INSTID(session) && OGRAC_CKPT_SESSION(session)) {
continue;
}
msg->count = 0;
idx_start = start;
while (idx_start < idx_end) {
page_id = pages[idx_start].page;
if (page_id.file >= INVALID_FILE_ID || (page_id.page == 0 && page_id.file == 0)) {
OG_LOG_RUN_ERR("[%u-%u] page_id invalid,", page_id.file, page_id.page);
cm_pop(session->stack);
return OG_ERROR;
}
ret = drc_get_edp_info(page_id, &edp_info);
if (ret != OG_SUCCESS) {
msg->edp_pages[msg->count++] = pages[idx_start];
idx_start++;
OG_LOG_DEBUG_INF("[CKPT][%u-%u][master process failed to get edp info, buf res already recycled.",
page_id.file, page_id.page);
continue;
}
if (MES_IS_INST_SEND(edp_info.edp_map, DCS_SELF_INSTID(session)) && OGRAC_CKPT_SESSION(session)) {
OG_LOG_DEBUG_INF(
"[CKPT][%u-%u][master process ignore request to clean edp flag, owner transfer to other node",
page_id.file, page_id.page);
--idx_end;
SWAP(edp_page_info_t, pages[idx_start], pages[idx_end]);
continue;
}
if (edp_info.lsn > pages[idx_start].lsn) {
OG_LOG_DEBUG_INF(
"[CKPT][%u-%u][master process ignore request to clean edp flag, drc edp has larger lsn (%lld) than clean request (%lld)",
page_id.file, page_id.page, edp_info.lsn, pages[idx_start].lsn);
--idx_end;
SWAP(edp_page_info_t, pages[idx_start], pages[idx_end]);
continue;
}
if (edp_info.edp_map == 0) {
OG_LOG_DEBUG_INF(
"[CKPT][%u-%u][edp map on master is 0, need to broadcast to clean edp in case previous clean edp msg is lost",
page_id.file, page_id.page);
}
if (edp_info.edp_map == 0 || MES_IS_INST_SEND(edp_info.edp_map, i)) {
msg->edp_pages[msg->count++] = pages[idx_start];
}
idx_start++;
}
if (msg->count == 0) {
continue;
}
if ((i == DCS_SELF_INSTID(session)) && !OGRAC_CKPT_SESSION(session)) {
dcs_clean_edp_pages_local(session, msg->edp_pages, msg->count);
continue;
}
mes_init_send_head(&msg->head, MES_CMD_CLEAN_EDP_REQ, OG_MSG_EDP_REQ_SEND_SIZE(msg->count), OG_INVALID_ID32,
DCS_SELF_INSTID(session), i, DCS_SELF_SID(session), OG_INVALID_ID16);
status = dcs_send_data_retry((void *)msg);
if (status != OG_SUCCESS) {
OG_LOG_RUN_ERR("[CKPT][%u-%u] send message failed, inst id(%u) is not alive", page_id.file, page_id.page,
i);
continue;
}
OG_LOG_DEBUG_INF("[CKPT] broadcast clean (%d) edp flags to edp node %d", msg->count, i);
}
for (idx_start = start; idx_start < idx_end; idx_start++) {
page_id = pages[idx_start].page;
(void)drc_clean_edp_info(pages[idx_start]);
OG_LOG_DEBUG_INF("[CKPT][%u-%u][master process clean edp info", page_id.file, page_id.page);
}
cm_pop(session->stack);
return OG_SUCCESS;
}
status_t dcs_clean_edp(knl_session_t *session, ckpt_context_t *ogx)
{
if (!DB_IS_CLUSTER(session) || ogx->remote_edp_clean_group.count == 0) {
return OG_SUCCESS;
}
uint8 master_id;
uint32 notify_master_idx = 0;
errno_t ret;
status_t status;
edp_page_info_t *pages = ogx->remote_edp_clean_group.pages;
uint32 count = ogx->remote_edp_clean_group.count;
ogx->remote_edp_clean_group.count = 0;
for (uint32 i = 0; i < count; i++) {
if ((pages[i].page.page == 0 && pages[i].page.file == 0) || pages[i].page.file >= INVALID_FILE_ID) {
OG_LOG_RUN_ERR("[%u-%u][dcs] dcs clean edp pageid is invalid", pages[i].page.page, pages[i].page.file);
return OG_ERROR;
}
if (drc_get_page_master_id(pages[i].page, &master_id) != OG_SUCCESS) {
return OG_ERROR;
}
if (master_id != DCS_SELF_INSTID(session)) {
SWAP(edp_page_info_t, pages[i], pages[notify_master_idx]);
notify_master_idx++;
}
}
uint32 page_start = notify_master_idx;
uint32 page_end = notify_master_idx;
while (page_end < count) {
page_end = MIN(page_start + OG_CKPT_EDP_GROUP_SIZE(session), count);
status = dcs_master_clean_edp(session, pages, page_start, page_end, OG_CKPT_GROUP_SIZE(session) + 1);
if (status != OG_SUCCESS) {
OG_LOG_RUN_ERR("[CKPT] master process local clean edp flag failed, notify_master_idx=%d",
notify_master_idx);
return OG_ERROR;
}
page_start = page_end;
}
if (notify_master_idx == 0) {
return OG_SUCCESS;
}
msg_ckpt_edp_request_t *msg = (msg_ckpt_edp_request_t *)cm_push(session->stack, OG_MSG_EDP_REQ_SIZE(session));
if (msg == NULL) {
OG_LOG_RUN_ERR("msg failed to malloc memory");
return OG_ERROR;
}
uint32 page_left = notify_master_idx;
uint32 page_sent = 0;
while (page_left > 0) {
msg->count = MIN(OG_CKPT_EDP_GROUP_SIZE(session), page_left);
ret = memcpy_sp((char *)msg->edp_pages, msg->count * sizeof(edp_page_info_t),
(char *)ogx->remote_edp_clean_group.pages + page_sent * sizeof(edp_page_info_t),
msg->count * sizeof(edp_page_info_t));
knl_securec_check(ret);
mes_init_send_head(&msg->head, MES_CMD_NOTIFY_MASTER_CLEAN_EDP_REQ, OG_MSG_EDP_REQ_SEND_SIZE(msg->count),
OG_INVALID_ID32, g_dtc->profile.inst_id, 0, session->id, OG_INVALID_ID16);
mes_broadcast(session->id, MES_BROADCAST_ALL_INST, msg, NULL);
page_sent += msg->count;
page_left -= msg->count;
}
OG_LOG_DEBUG_INF("[CKPT] broadcast (%d) clean edp flag, total %d edp pages to master", notify_master_idx, count);
cm_pop(session->stack);
return OG_SUCCESS;
}
static inline status_t dcs_request_edpinfo_r(knl_session_t *session, uint8 master_id, page_id_t page_id,
drc_edp_info_t *edp_info)
{
msg_edpinfo_req_t req;
mes_init_send_head(&req.head, MES_CMD_EDPINFO_REQ, sizeof(msg_edpinfo_req_t), OG_INVALID_ID32,
DCS_SELF_INSTID(session), 0, DCS_SELF_SID(session), OG_INVALID_ID16);
req.page_id = page_id;
status_t ret = mes_send_data((void *)&req);
DTC_DCS_DEBUG_INF("[DCS][%u-%u][%s]:dest_id=%u, dest_sid=%u, rsn=%u, result=%u", page_id.file, page_id.page,
MES_CMD2NAME(req.head.cmd), req.head.dst_inst, req.head.dst_sid, req.head.rsn, ret);
if (SECUREC_UNLIKELY(ret != OG_SUCCESS)) {
return OG_ERROR;
}
mes_message_t msg;
ret = mes_recv(session->id, &msg, OG_TRUE, req.head.rsn, MES_WAIT_MAX_TIME);
DTC_DCS_DEBUG_INF("[DCS][%u-%u][edpinfo ack]:result=%u", page_id.file, page_id.page, ret);
if (SECUREC_UNLIKELY(ret != OG_SUCCESS)) {
return OG_ERROR;
}
if (msg.head->cmd == MES_CMD_EDPINFO_ACK) {
msg_edpinfo_ack_t *ack = (msg_edpinfo_ack_t *)msg.buffer;
*edp_info = ack->edp_info;
ret = OG_SUCCESS;
} else {
ret = OG_ERROR;
}
mes_release_message_buf(msg.buffer);
return ret;
}
status_t dcs_request_edpinfo(knl_session_t *session, page_id_t page_id, drc_edp_info_t *edp_info)
{
uint8 master_id = OG_INVALID_ID8;
if (drc_get_page_master_id(page_id, &master_id) != OG_SUCCESS) {
return OG_ERROR;
}
if (master_id != DCS_SELF_INSTID(session)) {
return dcs_request_edpinfo_r(session, master_id, page_id, edp_info);
}
return drc_get_edp_info(page_id, edp_info);
}
void dcs_process_notify_master_clean_edp_req(void *sess, mes_message_t *receive_msg)
{
if (sizeof(msg_ckpt_edp_request_t) > receive_msg->head->size) {
OG_LOG_RUN_ERR("process notify master clean edp is invalid, msg size %u.", receive_msg->head->size);
mes_release_message_buf(receive_msg->buffer);
return;
}
msg_ckpt_edp_request_t *req = (msg_ckpt_edp_request_t *)receive_msg->buffer;
if (OG_MSG_EDP_REQ_SEND_SIZE(req->count) != receive_msg->head->size) {
OG_LOG_RUN_ERR("process notify master clean edp is invalid, msg size %u.", receive_msg->head->size);
mes_release_message_buf(receive_msg->buffer);
return;
}
knl_session_t *session = (knl_session_t *)sess;
edp_page_info_t *pages = req->edp_pages;
uint32 count = req->count;
uint8 master_id;
uint32 notify_master_idx = 0;
page_id_t page_id;
if (count > OG_CKPT_EDP_GROUP_SIZE(session)) {
OG_LOG_RUN_ERR("req->count(%d) err, larger than %u", count, OG_CKPT_EDP_GROUP_SIZE(session));
mes_release_message_buf(receive_msg->buffer);
return;
}
DTC_DCS_DEBUG_INF("[CKPT]master start to process clean edp flag req, src_id=%u, src_sid=%u, dest_id=%u, dest_sid=%u",
req->head.src_inst, req->head.src_sid, req->head.dst_inst, req->head.dst_sid);
for (uint32 i = 0; i < count; i++) {
page_id = pages[i].page;
if (page_id.file >= INVALID_FILE_ID || (page_id.page == 0 && page_id.file == 0)) {
OG_LOG_RUN_ERR("[%u-%u] page_id invalid,", page_id.file, page_id.page);
mes_release_message_buf(receive_msg->buffer);
return;
}
if (drc_get_page_master_id(page_id, &master_id) != OG_SUCCESS) {
OG_LOG_RUN_ERR("get master for page[%u-%u] failed,", page_id.file, page_id.page);
mes_release_message_buf(receive_msg->buffer);
return;
}
if (master_id != DCS_SELF_INSTID(session)) {
SWAP(edp_page_info_t, pages[i], pages[notify_master_idx]);
notify_master_idx++;
}
}
(void)dcs_master_clean_edp(session, pages, notify_master_idx, count, OG_CKPT_EDP_GROUP_SIZE(session));
mes_release_message_buf(receive_msg->buffer);
}
void dcs_process_clean_edp_req(void *sess, mes_message_t *receive_msg)
{
if (sizeof(msg_ckpt_edp_request_t) > receive_msg->head->size) {
OG_LOG_RUN_ERR("process clean edp is invalid, msg size %u.", receive_msg->head->size);
mes_release_message_buf(receive_msg->buffer);
return;
}
msg_ckpt_edp_request_t *req = (msg_ckpt_edp_request_t *)receive_msg->buffer;
if (OG_MSG_EDP_REQ_SEND_SIZE(req->count) != receive_msg->head->size) {
OG_LOG_RUN_ERR("process notify master clean edp is invalid, msg size %u.", receive_msg->head->size);
mes_release_message_buf(receive_msg->buffer);
return;
}
knl_session_t *session = (knl_session_t *)sess;
if (req->count > OG_CKPT_EDP_GROUP_SIZE(session)) {
OG_LOG_RUN_ERR("req->count(%d) err, larger than %u", req->count, OG_CKPT_EDP_GROUP_SIZE(session));
mes_release_message_buf(receive_msg->buffer);
return;
}
dcs_clean_edp_pages_local(session, req->edp_pages, req->count);
mes_release_message_buf(receive_msg->buffer);
}
void dcs_process_edpinfo_req(void *sess, mes_message_t *receive_msg)
{
if (sizeof(msg_edpinfo_req_t) != receive_msg->head->size) {
OG_LOG_RUN_ERR("process edpinfo req is invalid, msg size %u.", receive_msg->head->size);
mes_release_message_buf(receive_msg->buffer);
return;
}
msg_edpinfo_req_t *req = (msg_edpinfo_req_t *)receive_msg->buffer;
page_id_t page_id = req->page_id;
knl_session_t *session = (knl_session_t *)sess;
DTC_DCS_DEBUG_INF("[DCS][%u-%u][%s]: src_id=%u, src_sid=%u, dest_id=%u, dest_sid=%u", page_id.file, page_id.page,
MES_CMD2NAME(req->head.cmd), req->head.src_inst, req->head.src_sid, req->head.dst_inst,
req->head.dst_sid);
msg_edpinfo_ack_t ack;
status_t ret = drc_get_edp_info(page_id, &ack.edp_info);
if (SECUREC_UNLIKELY(ret != OG_SUCCESS)) {
mes_send_error_msg(receive_msg->head);
mes_release_message_buf(receive_msg->buffer);
return;
}
mes_init_ack_head(&req->head, &ack.head, MES_CMD_EDPINFO_ACK, sizeof(msg_edpinfo_ack_t), session->id);
ret = mes_send_data(&ack);
DTC_DCS_DEBUG_INF("[DCS][%u-%u][%s]: dest_id=%u, dest_sid=%u, result=%u", page_id.file, page_id.page,
MES_CMD2NAME(ack.head.cmd), ack.head.dst_inst, ack.head.dst_sid, ret);
mes_release_message_buf(receive_msg->buffer);
}
void dcs_process_page_req(void *sess, mes_message_t *msg)
{
dtc_page_req_t *req = (dtc_page_req_t *)msg->buffer;
mes_message_head_t ack_head;
page_id_t pagid;
knl_session_t *session = (knl_session_t *)sess;
printf("send page to instance %d sid %d, page id %d:%d\n",
(int32)req->head.src_inst, (int32)req->head.src_sid, (int32)req->pagid.file, (int32)req->pagid.page);
*/
mes_init_ack_head(&req->head, &ack_head, MES_CMD_PAGE_ACK,
(DEFAULT_PAGE_SIZE(session) + sizeof(mes_message_head_t)), session->id);
pagid = req->pagid;
mes_release_message_buf(msg->buffer);
buf_enter_page(session, pagid, LATCH_MODE_S, 0);
mes_send_data2(&ack_head, (void *)session->curr_page);
buf_leave_page(session, OG_FALSE);
return;
}
void dcs_process_invld_req(void *sess, mes_message_t *msg)
{
knl_begin_session_wait(sess, DCS_INVLDT_READONLY_PROCESS, OG_TRUE);
if (sizeof(dtc_page_req_t) != msg->head->size) {
OG_LOG_RUN_ERR("process invld req msg is invalid, msg size %u.", msg->head->size);
mes_release_message_buf(msg->buffer);
return;
}
dtc_page_req_t *req = (dtc_page_req_t *)msg->buffer;
mes_message_head_t ack_head = { 0 };
knl_session_t *session = (knl_session_t *)sess;
page_id_t pagid = req->pagid;
if (IS_INVALID_PAGID(pagid)) {
OG_LOG_RUN_ERR("[DCS][%u-%u] process invalid req failed, page_id invalid", pagid.file, pagid.page);
mes_release_message_buf(msg->buffer);
return;
}
bool32 is_owner = (req->head.flags & MES_FLAG_OWNER);
mes_command_t ack_type = is_owner ? MES_CMD_INVLDT_ACK : MES_CMD_BROADCAST_ACK;
status_t ret;
SYNC_POINT_GLOBAL_START(OGRAC_DCS_INVALID_PROC_ABORT, NULL, 0);
SYNC_POINT_GLOBAL_END;
mes_init_ack_head(&(req->head), &ack_head, ack_type, sizeof(mes_message_head_t), session->id);
DTC_DCS_DEBUG_INF("[DCS][%u-%u][%s]: started, src_id=%u, src_sid=%u, owner=%d", pagid.file, pagid.page,
MES_CMD2NAME(req->head.cmd), req->head.src_inst, req->head.src_sid, is_owner);
if (DRC_STOP_DCS_IO_FOR_REFORMING(req->req_version, session, pagid)) {
OG_LOG_RUN_ERR("[DCS][%u-%u]reforming, invalidate copy failed, req_version=%llu, cur_version=%llu", pagid.file,
pagid.page, req->req_version, DRC_GET_CURR_REFORM_VERSION);
ret = OG_ERROR;
} else {
if (!is_owner) {
ret = buf_invalidate_page_with_version(session, pagid, req->req_version);
} else {
ret = buf_invalidate_page_owner(session, pagid, req->req_version);
}
}
mes_release_message_buf(msg->buffer);
ack_head.status = ret;
ret = dcs_send_data_retry(&ack_head);
DTC_DCS_DEBUG(ret,
"[DCS][%u-%u][%s]: finished, status(%d), invalidate status(%d),dest_id=%u, dest_sid=%u, owner=%d",
pagid.file, pagid.page, MES_CMD2NAME(ack_head.cmd), ret, ack_head.status, ack_head.dst_inst,
ack_head.dst_sid, is_owner);
knl_end_session_wait(sess, DCS_INVLDT_READONLY_PROCESS);
}
exception - don't send msg to specified instance for some scenarios.
*/
status_t dcs_invalidate_readonly_copy(knl_session_t *session, page_id_t page_id, uint64 readonly_copies,
uint8 exception, uint64 req_version)
{
knl_begin_session_wait(session, DCS_INVLDT_READONLY_REQ, OG_TRUE);
uint64 invld_insts = readonly_copies;
if (exception >= OG_MAX_INSTANCES) {
OG_LOG_DEBUG_ERR("invalid inst id(%u)", exception);
return OG_ERROR;
}
if (exception != OG_INVALID_ID8) {
drc_bitmap64_clear(&invld_insts, exception);
}
if (DRC_STOP_DCS_IO_FOR_REFORMING(req_version, session, page_id)) {
OG_LOG_RUN_ERR_LIMIT(LOG_PRINT_INTERVAL_SECOND_20,
"[DCS][%u-%u]reforming, invalidate copy failed,, req_version=%llu, cur_version=%llu",
page_id.file, page_id.page, req_version, DRC_GET_CURR_REFORM_VERSION);
knl_end_session_wait(session, DCS_INVLDT_READONLY_REQ);
return OG_ERROR;
}
if (drc_bitmap64_exist(&invld_insts, DCS_SELF_INSTID(session))) {
if (buf_invalidate_page_with_version(session, page_id, req_version) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DCS][%u-%u]reforming, invalidate copy failed,, req_version=%llu, cur_version=%llu",
page_id.file, page_id.page, req_version, DRC_GET_CURR_REFORM_VERSION);
return OG_ERROR;
}
drc_bitmap64_clear(&invld_insts, DCS_SELF_INSTID(session));
}
if (!invld_insts) {
knl_end_session_wait(session, DCS_INVLDT_READONLY_REQ);
return OG_SUCCESS;
}
dtc_page_req_t req;
mes_init_send_head(&req.head, MES_CMD_INVLDT_REQ, sizeof(dtc_page_req_t), OG_INVALID_ID32, DCS_SELF_INSTID(session),
0, session->id, OG_INVALID_ID16);
req.pagid = page_id;
req.req_version = req_version;
if (DRC_STOP_DCS_IO_FOR_REFORMING(req_version, session, page_id)) {
OG_LOG_RUN_ERR("[DCS][%u-%u]reforming, invalidate copy failed, req_version=%llu, cur_version=%llu",
page_id.file, page_id.page, req_version, DRC_GET_CURR_REFORM_VERSION);
knl_end_session_wait(session, DCS_INVLDT_READONLY_REQ);
return OG_ERROR;
}
status_t ret = mes_broadcast_data_and_wait_with_retry(session->id, invld_insts, (void *)&req, DCS_WAIT_MSG_TIMEOUT,
DCS_MAX_RETRY_TIEMS);
DTC_DCS_DEBUG_INF("[DCS][%u-%u][invalidate readonly copy]: copy_insts=%llu, invld_insts=%llu, exception=%u, ret=%d",
page_id.file, page_id.page, readonly_copies, invld_insts, exception, ret);
knl_end_session_wait(session, DCS_INVLDT_READONLY_REQ);
return ret;
}
status_t dcs_invalidate_page_owner(knl_session_t *session, page_id_t page_id, uint8 owner_id, uint64 req_version)
{
status_t ret;
knl_begin_session_wait(session, DCS_INVLDT_READONLY_REQ, OG_TRUE);
if (owner_id == DCS_SELF_INSTID(session)) {
ret = buf_invalidate_page_owner(session, page_id, req_version);
knl_end_session_wait(session, DCS_INVLDT_READONLY_REQ);
return ret;
}
dtc_page_req_t req;
mes_init_send_head(&req.head, MES_CMD_INVLDT_REQ, sizeof(dtc_page_req_t), OG_INVALID_ID32, DCS_SELF_INSTID(session),
owner_id, session->id, OG_INVALID_ID16);
req.pagid = page_id;
req.head.flags = MES_FLAG_OWNER;
req.req_version = req_version;
if (dcs_send_data_retry(&req) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DCS][%u-%u][%s]: mes send data failed, src_id=%u, src_sid=%u, dest_id=%u, dest_sid=%u",
page_id.file, page_id.page, MES_CMD2NAME(req.head.cmd), req.head.src_inst, req.head.src_sid,
req.head.dst_inst, req.head.dst_sid);
knl_end_session_wait(session, DCS_INVLDT_READONLY_REQ);
return OG_ERROR;
}
mes_message_t msg;
if (mes_recv(session->id, &msg, OG_TRUE, req.head.rsn, MES_WAIT_MAX_TIME) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DCS][%u-%u][%s]: invalidate owner time out, src_id=%u, src_sid=%u, dest_id=%u, dest_sid=%u",
page_id.file, page_id.page, MES_CMD2NAME(req.head.cmd), req.head.src_inst, req.head.src_sid,
req.head.dst_inst, req.head.dst_sid);
knl_end_session_wait(session, DCS_INVLDT_READONLY_REQ);
return OG_ERROR;
}
ret = msg.head->status;
DTC_DCS_DEBUG(ret, "[DCS][%u-%u][%s]: src_id=%u, src_sid=%u, dest_id=%u, dest_sid=%u, result=%d", page_id.file,
page_id.page, MES_CMD2NAME(req.head.cmd), req.head.src_inst, req.head.src_sid, req.head.dst_inst,
req.head.dst_sid, ret);
mes_release_message_buf(msg.buffer);
knl_end_session_wait(session, DCS_INVLDT_READONLY_REQ);
return ret;
}
void dcs_process_ddl_broadcast(void *sess, mes_message_t *msg)
{
uint32 offset = sizeof(msg_ddl_info_t);
uint32 verify_offset = sizeof(msg_ddl_info_t);
if (sizeof(msg_ddl_info_t) > msg->head->size) {
OG_LOG_RUN_ERR("msg ddl broadcast is invalid, msg size %u.", msg->head->size);
mes_release_message_buf(msg->buffer);
return;
}
msg_ddl_info_t *info = (msg_ddl_info_t *)((char *)msg->buffer);
if (sizeof(msg_ddl_info_t) + info->log_len != msg->head->size) {
OG_LOG_RUN_ERR("msg ddl broadcast is invalid, msg size %u.", msg->head->size);
mes_release_message_buf(msg->buffer);
return;
}
knl_scn_t lamport_scn = info->scn;
log_entry_t *log = NULL;
uint32 log_len = info->log_len;
mes_message_head_t head;
knl_session_t *session = (knl_session_t *)sess;
while (verify_offset < log_len + sizeof(msg_ddl_info_t)) {
log = (log_entry_t *)((char *)info + verify_offset);
verify_offset += log->size;
}
if (log_len + sizeof(msg_ddl_info_t) != verify_offset) {
OG_LOG_RUN_ERR(
"log len(%u) and offset(%u) is not invalid, not process this sync ddl message, wait retry message", log_len,
verify_offset);
return;
}
dtc_update_scn(session, lamport_scn);
SYNC_POINT_GLOBAL_START(OGRAC_SYNC_DDL_BEFORE_BCAST_ABORT, NULL, 0);
SYNC_POINT_GLOBAL_END;
while (offset < log_len + sizeof(msg_ddl_info_t)) {
log = (log_entry_t *)((char *)info + offset);
if (dtc_refresh_ddl(session, log) != OG_SUCCESS) {
OG_LOG_RUN_ERR("refresh ddl failed, not process this sync ddl message, wait retry message");
return;
}
offset += log->size;
}
mes_init_ack_head(msg->head, &head, MES_CMD_DDL_BROADCAST_ACK, sizeof(mes_message_head_t), OG_INVALID_ID16);
mes_release_message_buf(msg->buffer);
if (mes_send_data(&head) != OG_SUCCESS) {
CM_ASSERT(0);
}
#ifdef LOG_DIAG
knl_panic(!session->atomic_op);
#endif
knl_panic(!DB_IS_PRIMARY(&session->kernel->db) || !DB_IS_READONLY(session));
knl_panic(session->page_stack.depth == 0);
knl_panic(session->dirty_count == 0);
knl_panic(session->changed_count == 0);
knl_panic(OGRAC_REPLAY_NODE(session));
}
static void dcs_init_pcr_request(knl_session_t *session, cr_cursor_t *cursor, cr_type_t type,
msg_pcr_request_t *request)
{
request->cr_type = type;
request->page_id = GET_ROWID_PAGE(cursor->rowid);
request->xid = cursor->xid;
request->query_scn = cursor->query_scn;
request->ssn = cursor->ssn;
request->ssi_conflict = cursor->ssi_conflict;
request->cleanout = cursor->cleanout;
request->force_cvt = 0;
}
static status_t dcs_send_pcr_request(knl_session_t *session, msg_pcr_request_t *request, cr_cursor_t *cursor,
uint8 dst_id)
{
bool8 local_cr = g_dtc->profile.enable_rmo_cr || cursor->local_cr;
request->head.dst_inst = local_cr ? request->head.src_inst : dst_id;
request->ssi_conflict = cursor->ssi_conflict;
request->cleanout = cursor->cleanout;
request->local_cr = local_cr;
DTC_DCS_DEBUG_INF("[PCR][%u-%u][send pcr request] cr_type %u query_scn %llu query_ssn %u "
"src_inst %u src_sid %u dst_inst %u local_cr %u force_cvt %u",
(uint32)request->page_id.file, (uint32)request->page_id.page, request->cr_type,
request->query_scn, request->ssn, request->head.src_inst, request->head.src_sid, dst_id,
request->local_cr, request->force_cvt);
if (mes_send_data(request) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t dcs_send_pcr_ack(knl_session_t *session, msg_pcr_request_t *request, char *page, cr_cursor_t *cursor)
{
msg_pcr_ack_t msg;
mes_init_ack_head(&request->head, &msg.head, MES_CMD_PCR_ACK, (sizeof(msg_pcr_ack_t) + DEFAULT_PAGE_SIZE(session)),
session->id);
msg.head.src_inst = session->kernel->id;
CM_ASSERT(request->head.dst_inst == session->kernel->id);
msg.ssi_conflict = cursor->ssi_conflict;
msg.cleanout = cursor->cleanout;
msg.force_cvt = request->force_cvt;
DTC_DCS_DEBUG_INF("[PCR][%u-%u][send pcr ack] cr_type %u src_inst %u src_sid %u dst_inst %u dst_sid %u force_cvt %u",
(uint32)request->page_id.file, (uint32)request->page_id.page, request->cr_type, msg.head.src_inst,
msg.head.src_sid, msg.head.dst_inst, msg.head.dst_sid, msg.force_cvt);
status_t ret = OG_SUCCESS;
SYNC_POINT_GLOBAL_START(OGRAC_PCR_ACK_FAIL, &ret, OG_ERROR);
ret = mes_send_data3(&msg.head, sizeof(msg_pcr_ack_t), page);
SYNC_POINT_GLOBAL_END;
if (ret != OG_SUCCESS) {
OG_THROW_ERROR(ERR_MES_SEND_DATA_FAIL, "pcr ack");
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t dcs_send_txn_wait(knl_session_t *session, msg_pcr_request_t *request, xid_t wxid)
{
msg_txn_wait_t msg;
mes_init_ack_head(&request->head, &msg.head, MES_CMD_TXN_WAIT, sizeof(msg_txn_wait_t), session->id);
msg.head.src_inst = session->kernel->id;
CM_ASSERT(request->head.dst_inst == session->kernel->id);
msg.wxid = wxid;
DTC_DCS_DEBUG_INF("[PCR][%u-%u][send txn wait] wxid %u-%u-%u src_inst %u src_sid %u dst_inst %u dst_sid %u",
(uint32)request->page_id.file, (uint32)request->page_id.page, wxid.xmap.seg_id, wxid.xmap.slot,
wxid.xnum, msg.head.src_inst, msg.head.src_sid, msg.head.dst_inst, msg.head.dst_sid);
status_t ret = OG_SUCCESS;
SYNC_POINT_GLOBAL_START(OGRAC_TXN_WAIT_SEND_FAIL, &ret, OG_ERROR);
ret = mes_send_data(&msg);
SYNC_POINT_GLOBAL_END;
if (ret != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static inline void dcs_heap_init_cr_cursor(cr_cursor_t *cr_cursor, msg_pcr_request_t *request)
{
knl_set_rowid_page(&cr_cursor->rowid, request->page_id);
cr_cursor->xid = request->xid;
cr_cursor->wxid.value = OG_INVALID_ID64;
cr_cursor->query_scn = request->query_scn;
cr_cursor->ssn = request->ssn;
cr_cursor->ssi_conflict = request->ssi_conflict;
cr_cursor->cleanout = request->cleanout;
cr_cursor->is_remote = OG_TRUE;
cr_cursor->local_cr = OG_FALSE;
}
static status_t dcs_heap_construct_cr_page(knl_session_t *session, msg_pcr_request_t *request)
{
heap_page_t *cr_page = (heap_page_t *)((char *)request + sizeof(msg_pcr_request_t));
cr_cursor_t cr_cursor;
uint8 inst_id;
if (!IS_SAME_PAGID(AS_PAGID(cr_page->head.id), request->page_id) || !CHECK_PAGE_PCN((page_head_t *)cr_page)) {
OG_LOG_RUN_ERR("dcs handle pcr req is invalid, cr page[%u-%u], request page[%u-%u], page head pcn %u, page "
"tail pcn %u",
AS_PAGID(cr_page->head.id).file, AS_PAGID(cr_page->head.id).page, request->page_id.page,
request->page_id.file, ((page_head_t *)cr_page)->pcn, PAGE_TAIL((page_head_t *)cr_page)->pcn);
return OG_ERROR;
}
dtc_flush_log(session, request->page_id);
dcs_heap_init_cr_cursor(&cr_cursor, request);
for (;;) {
if (pcrh_fetch_invisible_itl(session, &cr_cursor, cr_page) != OG_SUCCESS) {
return OG_ERROR;
}
if (cr_cursor.itl == NULL) {
if (dcs_send_pcr_ack(session, request, (char *)cr_page, &cr_cursor) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
if (cr_cursor.wxid.value != OG_INVALID_ID64) {
if (dcs_send_txn_wait(session, request, cr_cursor.wxid) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
inst_id = xid_get_inst_id(session, cr_cursor.itl->xid);
if (inst_id == session->kernel->id && !cr_cursor.local_cr) {
if (pcrh_reorganize_with_ud_list(session, &cr_cursor, cr_page, NULL) != OG_SUCCESS) {
return OG_ERROR;
}
} else {
return dcs_send_pcr_request(session, request, &cr_cursor, inst_id);
}
}
}
static inline void dcs_btree_init_cr_cursor(cr_cursor_t *cr_cursor, msg_pcr_request_t *request)
{
msg_btree_request_t *btree_request = (msg_btree_request_t *)request;
knl_set_rowid_page(&cr_cursor->rowid, request->page_id);
cr_cursor->xid = request->xid;
cr_cursor->wxid.value = OG_INVALID_ID64;
cr_cursor->query_scn = request->query_scn;
cr_cursor->ssn = request->ssn;
cr_cursor->ssi_conflict = request->ssi_conflict;
cr_cursor->cleanout = request->cleanout;
cr_cursor->entry = btree_request->entry;
cr_cursor->profile = &btree_request->profile;
cr_cursor->is_remote = OG_TRUE;
cr_cursor->local_cr = OG_FALSE;
}
static status_t dcs_btree_construct_cr_page(knl_session_t *session, msg_pcr_request_t *request)
{
btree_page_t *cr_page = (btree_page_t *)((char *)request + sizeof(msg_btree_request_t));
cr_cursor_t cr_cursor;
uint8 inst_id;
dtc_flush_log(session, request->page_id);
dcs_btree_init_cr_cursor(&cr_cursor, request);
for (;;) {
if (pcrb_get_invisible_itl(session, &cr_cursor, cr_page) != OG_SUCCESS) {
return OG_ERROR;
}
if (cr_cursor.itl == NULL) {
if (dcs_send_pcr_ack(session, request, (char *)cr_page, &cr_cursor) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
if (cr_cursor.wxid.value != OG_INVALID_ID64) {
if (dcs_send_txn_wait(session, request, cr_cursor.wxid) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
inst_id = xid_get_inst_id(session, cr_cursor.itl->xid);
if (inst_id == session->kernel->id && !cr_cursor.local_cr) {
if (pcrb_reorganize_with_undo_list(session, &cr_cursor, cr_page) != OG_SUCCESS) {
return OG_ERROR;
}
} else {
return dcs_send_pcr_request(session, request, &cr_cursor, inst_id);
}
}
}
static inline void dcs_handle_pcr_request(knl_session_t *session, mes_message_t *msg)
{
if (sizeof(msg_pcr_request_t) > msg->head->size) {
OG_LOG_RUN_ERR("dcs handle pcr req is invalid, msg size %u.", msg->head->size);
mes_send_error_msg(msg->head);
return;
}
msg_pcr_request_t *request = (msg_pcr_request_t *)(msg->buffer);
if (request->cr_type == CR_TYPE_HEAP) {
if (sizeof(msg_pcr_request_t) + DEFAULT_PAGE_SIZE(session) != msg->head->size) {
OG_LOG_RUN_ERR("dcs handle pcr req is invalid, msg size %u.", msg->head->size);
mes_send_error_msg(msg->head);
return;
}
if (dcs_heap_construct_cr_page(session, request) != OG_SUCCESS) {
mes_send_error_msg(msg->head);
}
} else if (request->cr_type == CR_TYPE_BTREE) {
if (sizeof(msg_btree_request_t) + DEFAULT_PAGE_SIZE(session) != msg->head->size) {
OG_LOG_RUN_ERR("dcs handle pcr req is invalid, msg size %u.", msg->head->size);
mes_send_error_msg(msg->head);
return;
}
if (dcs_btree_construct_cr_page(session, request) != OG_SUCCESS) {
mes_send_error_msg(msg->head);
}
} else {
OG_THROW_ERROR(ERR_CAPABILITY_NOT_SUPPORT);
mes_send_error_msg(msg->head);
}
}
void dcs_process_pcr_request(void *sess, mes_message_t *msg)
{
knl_session_t *session = (knl_session_t *)sess;
if (msg->head->src_inst == session->kernel->id) {
msg->head->dst_sid = msg->head->src_sid;
mes_process_msg_ack(sess, msg);
return;
}
dcs_handle_pcr_request(session, msg);
mes_release_message_buf(msg->buffer);
}
static status_t dcs_pcr_process_message(knl_session_t *session, mes_message_t *message, cr_cursor_t *cursor,
char *page_buf, pcr_status_t *status, bool32 *is_found)
{
char *recv_page = NULL;
errno_t ret;
switch (message->head->cmd) {
case MES_CMD_MASTER_ACK_NEED_LOAD:
case MES_CMD_MASTER_ACK_ALREADY_OWNER:
*status = PCR_LOCAL_READ;
break;
case MES_CMD_PCR_REQ_MASTER:
*status = PCR_CHECK_MASTER;
break;
case MES_CMD_PCR_REQ: {
msg_pcr_request_t *reply = (msg_pcr_request_t *)(message->buffer);
uint32 head_size = (reply->cr_type != CR_TYPE_BTREE) ? sizeof(msg_pcr_request_t)
: sizeof(msg_btree_request_t);
recv_page = (char *)reply + head_size;
cursor->cleanout = reply->cleanout;
cursor->ssi_conflict = reply->ssi_conflict;
cursor->local_cr = reply->local_cr;
ret = memcpy_sp(page_buf, DEFAULT_PAGE_SIZE(session), recv_page, DEFAULT_PAGE_SIZE(session));
knl_securec_check(ret);
*status = PCR_CONSTRUCT;
if (reply->force_cvt) {
buf_set_force_request(session, reply->page_id);
}
break;
}
case MES_CMD_PCR_ACK: {
msg_pcr_ack_t *ack = (msg_pcr_ack_t *)(message->buffer);
recv_page = (char *)ack + sizeof(msg_pcr_ack_t);
cursor->cleanout = ack->cleanout;
cursor->ssi_conflict = ack->ssi_conflict;
ret = memcpy_sp(page_buf, DEFAULT_PAGE_SIZE(session), recv_page, DEFAULT_PAGE_SIZE(session));
knl_securec_check(ret);
*status = PCR_PAGE_VISIBLE;
cursor->itl = NULL;
if (ack->force_cvt) {
buf_set_force_request(session, GET_ROWID_PAGE(cursor->rowid));
}
break;
}
case MES_CMD_CHECK_VISIBLE: {
msg_cr_check_t *check = (msg_cr_check_t *)(message->buffer);
cursor->local_cr = check->local_cr;
recv_page = (char *)check + sizeof(msg_cr_check_t);
ret = memcpy_sp(page_buf, DEFAULT_PAGE_SIZE(session), recv_page, DEFAULT_PAGE_SIZE(session));
knl_securec_check(ret);
break;
}
case MES_CMD_CHECK_VISIBLE_ACK: {
*is_found = *(bool32 *)MES_MESSAGE_BODY(message);
cursor->itl = NULL;
break;
}
case MES_CMD_TXN_WAIT:
cursor->wxid = *(xid_t *)MES_MESSAGE_BODY(message);
*status = PCR_TRY_READ;
break;
case MES_CMD_ERROR_MSG:
mes_handle_error_msg(message->buffer);
return OG_ERROR;
default:
OG_THROW_ERROR(ERR_MES_ILEGAL_MESSAGE, "invalid MES message type");
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t dcs_heap_request_cr_page(knl_session_t *session, cr_cursor_t *cursor, char *page, uint8 dst_id)
{
msg_pcr_request_t request;
mes_message_t message;
pcr_status_t status;
uint16 size = (sizeof(msg_pcr_request_t) + DEFAULT_PAGE_SIZE(session));
mes_init_send_head(&request.head, MES_CMD_PCR_REQ, size, OG_INVALID_ID32, session->kernel->id, dst_id, session->id,
OG_INVALID_ID16);
dcs_init_pcr_request(session, cursor, CR_TYPE_HEAP, &request);
DTC_DCS_DEBUG_INF("[PCR][%u-%u][request cr page] cr_type %u query_scn %llu query_ssn %u "
"src_inst %u src_sid %u dst_inst %u",
(uint32)request.page_id.file, (uint32)request.page_id.page, request.cr_type, request.query_scn,
request.ssn, session->kernel->id, session->id, dst_id);
for (;;) {
knl_begin_session_wait(session, PCR_REQ_HEAP_PAGE, OG_TRUE);
status_t ret = OG_SUCCESS;
SYNC_POINT_GLOBAL_START(OGRAC_PCR_REQ_HEAP_PAGE_SEND_FAIL, &ret, OG_ERROR);
ret = mes_send_data3(&request.head, sizeof(msg_pcr_request_t), page);
SYNC_POINT_GLOBAL_END;
if (ret != OG_SUCCESS) {
knl_end_session_wait(session, PCR_REQ_HEAP_PAGE);
break;
}
if (mes_recv(session->id, &message, OG_TRUE, request.head.rsn, DCS_CR_REQ_TIMEOUT) != OG_SUCCESS) {
knl_end_session_wait(session, PCR_REQ_HEAP_PAGE);
break;
}
knl_end_session_wait(session, PCR_REQ_HEAP_PAGE);
if (dcs_pcr_process_message(session, &message, cursor, page, &status, NULL) != OG_SUCCESS) {
mes_release_message_buf(message.buffer);
return OG_ERROR;
}
mes_release_message_buf(message.buffer);
return OG_SUCCESS;
}
DTC_DCS_DEBUG_INF("[PCR][%u-%u][request cr page failed] cr_type %u query_scn %llu query_ssn %u "
"src_inst %u src_sid %u dst_inst %u",
(uint32)request.page_id.file, (uint32)request.page_id.page, request.cr_type, request.query_scn,
request.ssn, session->kernel->id, session->id, dst_id);
cm_reset_error();
cm_sleep(MES_MSG_RETRY_TIME);
return OG_SUCCESS;
}
status_t dcs_btree_request_cr_page(knl_session_t *session, cr_cursor_t *cursor, char *page, uint8 dst_id)
{
msg_btree_request_t msg;
msg_pcr_request_t *request = &msg.pcr_request;
mes_message_t message;
pcr_status_t status;
uint16 size = (uint16)(sizeof(msg_btree_request_t) + DEFAULT_PAGE_SIZE(session));
mes_init_send_head(&request->head, MES_CMD_PCR_REQ, size, OG_INVALID_ID32, session->kernel->id, dst_id, session->id,
OG_INVALID_ID16);
dcs_init_pcr_request(session, cursor, CR_TYPE_BTREE, request);
msg.entry = cursor->entry;
msg.profile = *cursor->profile;
DTC_DCS_DEBUG_INF("[PCR][%u-%u][request cr page] cr_type %u query_scn %llu query_ssn %u "
"src_inst %u src_sid %u dst_inst %u",
(uint32)request->page_id.file, (uint32)request->page_id.page, request->cr_type,
request->query_scn, request->ssn, session->kernel->id, session->id, dst_id);
for (;;) {
knl_begin_session_wait(session, PCR_REQ_BTREE_PAGE, OG_TRUE);
status_t ret = OG_SUCCESS;
SYNC_POINT_GLOBAL_START(OGRAC_PCR_REQ_BTREE_PAGE_SEND_FAIL, &ret, OG_ERROR);
ret = mes_send_data3(&request->head, sizeof(msg_btree_request_t), page);
SYNC_POINT_GLOBAL_END;
if (ret != OG_SUCCESS) {
knl_end_session_wait(session, PCR_REQ_BTREE_PAGE);
break;
}
if (mes_recv(session->id, &message, OG_TRUE, request->head.rsn, DCS_CR_REQ_TIMEOUT) != OG_SUCCESS) {
knl_end_session_wait(session, PCR_REQ_BTREE_PAGE);
break;
}
knl_end_session_wait(session, PCR_REQ_BTREE_PAGE);
if (dcs_pcr_process_message(session, &message, cursor, page, &status, NULL) != OG_SUCCESS) {
mes_release_message_buf(message.buffer);
return OG_ERROR;
}
mes_release_message_buf(message.buffer);
return OG_SUCCESS;
}
DTC_DCS_DEBUG_INF("[PCR][%u-%u][request cr page failed] cr_type %u query_scn %llu query_ssn %u "
"src_inst %u src_sid %u dst_inst %u",
(uint32)request->page_id.file, (uint32)request->page_id.page, request->cr_type,
request->query_scn, request->ssn, session->kernel->id, session->id, dst_id);
cm_reset_error();
cm_sleep(MES_MSG_RETRY_TIME);
return OG_SUCCESS;
}
status_t dcs_pcr_request_master(knl_session_t *session, cr_cursor_t *cursor, char *page_buf, uint8 master_id,
cr_type_t type, pcr_status_t *status)
{
msg_pcr_request_t request;
mes_message_t message;
knl_panic(*status == PCR_REQUEST_MASTER);
mes_init_send_head(&request.head, MES_CMD_PCR_REQ_MASTER, sizeof(msg_pcr_request_t), OG_INVALID_ID32,
session->kernel->id, master_id, session->id, OG_INVALID_ID16);
dcs_init_pcr_request(session, cursor, type, &request);
DTC_DCS_DEBUG_INF("[PCR][%u-%u][request master] cr_type %u query_scn %llu query_ssn %u "
"src_inst %u src_sid %u dst_inst %u",
(uint32)request.page_id.file, (uint32)request.page_id.page, type, request.query_scn, request.ssn,
session->kernel->id, session->id, master_id);
for (;;) {
knl_begin_session_wait(session, PCR_REQ_MASTER, OG_TRUE);
status_t ret = OG_SUCCESS;
SYNC_POINT_GLOBAL_START(OGRAC_PCR_REQ_MASTER_SEND_FAIL, &ret, OG_ERROR);
ret = mes_send_data(&request);
SYNC_POINT_GLOBAL_END;
if (ret != OG_SUCCESS) {
knl_end_session_wait(session, PCR_REQ_MASTER);
break;
}
if (mes_recv(session->id, &message, OG_TRUE, request.head.rsn, DCS_CR_REQ_TIMEOUT) != OG_SUCCESS) {
knl_end_session_wait(session, PCR_REQ_MASTER);
break;
}
knl_end_session_wait(session, PCR_REQ_MASTER);
session->stat->dcs_net_time += session->wait_pool[PCR_REQ_MASTER].usecs;
if (dcs_pcr_process_message(session, &message, cursor, page_buf, status, NULL) != OG_SUCCESS) {
mes_release_message_buf(message.buffer);
return OG_ERROR;
}
mes_release_message_buf(message.buffer);
return OG_SUCCESS;
}
DTC_DCS_DEBUG_INF("[PCR][%u-%u][request master failed] cr_type %u query_scn %llu query_ssn %u "
"src_inst %u src_sid %u dst_inst %u",
(uint32)request.page_id.file, (uint32)request.page_id.page, type, request.query_scn, request.ssn,
session->kernel->id, session->id, master_id);
cm_reset_error();
*status = PCR_CHECK_MASTER;
cm_sleep(MES_MSG_RETRY_TIME);
return OG_SUCCESS;
}
status_t dcs_pcr_request_owner(knl_session_t *session, cr_cursor_t *cursor, char *page_buf, uint8 owner_id,
cr_type_t type, pcr_status_t *status)
{
msg_pcr_request_t request;
mes_message_t message;
knl_panic(*status == PCR_REQUEST_OWNER);
mes_init_send_head(&request.head, MES_CMD_PCR_REQ_OWNER, sizeof(msg_pcr_request_t), OG_INVALID_ID32,
session->kernel->id, owner_id, session->id, OG_INVALID_ID16);
dcs_init_pcr_request(session, cursor, type, &request);
DTC_DCS_DEBUG_INF("[PCR][%u-%u][request owner] cr_type %u query_scn %llu query_ssn %u "
"src_inst %u src_sid %u dst_inst %u",
(uint32)request.page_id.file, (uint32)request.page_id.page, type, request.query_scn, request.ssn,
session->kernel->id, session->id, owner_id);
for (;;) {
knl_begin_session_wait(session, PCR_REQ_OWNER, OG_TRUE);
status_t ret = OG_SUCCESS;
SYNC_POINT_GLOBAL_START(OGRAC_PCR_REQ_OWNER_SEND_FAIL, &ret, OG_ERROR);
ret = mes_send_data(&request);
SYNC_POINT_GLOBAL_END;
if (ret != OG_SUCCESS) {
knl_end_session_wait(session, PCR_REQ_OWNER);
break;
}
if (mes_recv(session->id, &message, OG_TRUE, request.head.rsn, DCS_CR_REQ_TIMEOUT) != OG_SUCCESS) {
knl_end_session_wait(session, PCR_REQ_OWNER);
break;
}
knl_end_session_wait(session, PCR_REQ_OWNER);
session->stat->dcs_net_time += session->wait_pool[PCR_REQ_OWNER].usecs;
if (dcs_pcr_process_message(session, &message, cursor, page_buf, status, NULL) != OG_SUCCESS) {
mes_release_message_buf(message.buffer);
return OG_ERROR;
}
mes_release_message_buf(message.buffer);
return OG_SUCCESS;
}
DTC_DCS_DEBUG_INF("[PCR][%u-%u][request owner failed] cr_type %u query_scn %llu query_ssn %u "
"src_inst %u src_sid %u dst_inst %u",
(uint32)request.page_id.file, (uint32)request.page_id.page, type, request.query_scn, request.ssn,
session->kernel->id, session->id, owner_id);
cm_reset_error();
*status = PCR_CHECK_MASTER;
cm_sleep(MES_MSG_RETRY_TIME);
return OG_SUCCESS;
}
status_t dcs_pcr_check_master(knl_session_t *session, page_id_t page_id, cr_type_t type, uint8 *dst_id,
pcr_status_t *status)
{
uint8 master_id;
uint8 owner_id;
knl_panic(*status == PCR_CHECK_MASTER);
if (drc_get_page_master_id(page_id, &master_id) != OG_SUCCESS) {
return OG_ERROR;
}
if (master_id == session->kernel->id) {
(void)drc_get_page_owner_id(session, page_id, &owner_id, NULL);
if (owner_id == OG_INVALID_ID8 || owner_id == session->kernel->id) {
*status = PCR_LOCAL_READ;
} else {
*status = PCR_REQUEST_OWNER;
*dst_id = owner_id;
}
} else {
*status = PCR_REQUEST_MASTER;
*dst_id = master_id;
}
return OG_SUCCESS;
}
static status_t dcs_pcr_reroute_request(knl_session_t *session, msg_pcr_request_t *request, bool32 *local_route)
{
uint8 master_id;
if (drc_get_page_master_id(request->page_id, &master_id) != OG_SUCCESS) {
return OG_ERROR;
}
if (master_id == session->kernel->id) {
*local_route = OG_TRUE;
return OG_SUCCESS;
}
mes_init_send_head(&request->head, MES_CMD_PCR_REQ_MASTER, request->head.size, request->head.rsn,
request->head.src_inst, master_id, request->head.src_sid, OG_INVALID_ID16);
DTC_DCS_DEBUG_INF("[PCR][%u-%u][reroute request] cr_type %u query_scn %llu query_ssn %u "
"src_inst %u src_sid %u dst_inst %u",
(uint32)request->page_id.file, (uint32)request->page_id.page, request->cr_type,
request->query_scn, request->ssn, request->head.src_inst, request->head.src_sid, master_id);
if (mes_send_data(request) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t dcs_process_heap_pcr_construct(knl_session_t *session, msg_pcr_request_t *request, bool32 *local_route)
{
buf_read_assist_t ra;
msg_pcr_request_t *new_req = NULL;
heap_page_t *page = NULL;
*local_route = OG_FALSE;
dtc_read_init(&ra, request->page_id, LATCH_MODE_S, ENTER_PAGE_NORMAL | ENTER_PAGE_FROM_REMOTE, request->query_scn,
DTC_BUF_READ_ONE);
if (dtc_read_page(session, &ra) != OG_SUCCESS) {
return OG_ERROR;
}
page = (heap_page_t *)CURR_PAGE(session);
if (page == NULL) {
return dcs_pcr_reroute_request(session, request, local_route);
}
if (page->head.type != PAGE_TYPE_PCRH_DATA) {
buf_leave_page(session, OG_FALSE);
OG_THROW_ERROR(ERR_OBJECT_ALREADY_DROPPED, "table");
return OG_ERROR;
}
new_req = (msg_pcr_request_t *)cm_push(session->stack, sizeof(msg_pcr_request_t) + DEFAULT_PAGE_SIZE(session));
if (new_req == NULL) {
buf_leave_page(session, OG_FALSE);
OG_LOG_RUN_ERR("send_msg failed to malloc memory, send_msg size %u.",
(uint32_t)(sizeof(msg_pcr_request_t) + DEFAULT_PAGE_SIZE(session)));
return OG_ERROR;
}
*new_req = *(msg_pcr_request_t *)request;
new_req->head.cmd = MES_CMD_PCR_REQ;
new_req->head.size = (uint16)(sizeof(msg_pcr_request_t) + DEFAULT_PAGE_SIZE(session));
errno_t ret = memcpy_sp((char *)new_req + sizeof(msg_pcr_request_t), DEFAULT_PAGE_SIZE(session), (void *)page,
DEFAULT_PAGE_SIZE(session));
knl_securec_check(ret);
if (g_dtc->profile.remote_access_limit != 0 &&
(uint32)session->curr_page_ctrl->remote_access >= g_dtc->profile.remote_access_limit) {
new_req->force_cvt = 1;
}
buf_leave_page(session, OG_FALSE);
dtc_update_scn(session, request->query_scn);
if (dcs_heap_construct_cr_page(session, new_req) != OG_SUCCESS) {
cm_pop(session->stack);
return OG_ERROR;
}
cm_pop(session->stack);
return OG_SUCCESS;
}
static void dcs_send_already_owner(knl_session_t *session, mes_message_t *msg)
{
mes_message_head_t head;
mes_init_ack_head(msg->head, &head, MES_CMD_MASTER_ACK_ALREADY_OWNER, sizeof(mes_message_head_t),
DCS_SELF_SID(session));
(void)mes_send_data(&head);
}
static void dcs_send_grant_owner(knl_session_t *session, mes_message_t *msg)
{
mes_message_head_t head;
mes_init_ack_head(msg->head, &head, MES_CMD_MASTER_ACK_NEED_LOAD, sizeof(mes_message_head_t),
DCS_SELF_SID(session));
(void)mes_send_data(&head);
}
static void dcs_route_pcr_request_owner(knl_session_t *session, msg_pcr_request_t *request, uint8 owner_id)
{
mes_init_send_head(&request->head, MES_CMD_PCR_REQ_OWNER, request->head.size, request->head.rsn,
request->head.src_inst, owner_id, request->head.src_sid, OG_INVALID_ID16);
(void)mes_send_data(request);
}
static inline void dcs_handle_pcr_req_master(knl_session_t *session, mes_message_t *msg)
{
msg_pcr_request_t *request = (msg_pcr_request_t *)(msg->buffer);
uint8 owner_id;
bool32 local_route = OG_TRUE;
if (request->page_id.file >= INVALID_FILE_ID || (request->page_id.page == 0 && request->page_id.file == 0)) {
OG_LOG_RUN_ERR("[%u-%u] page_id invalid,", request->page_id.file, request->page_id.page);
mes_send_error_msg(msg->head);
return;
}
while (local_route) {
(void)drc_get_page_owner_id(session, request->page_id, &owner_id, NULL);
if (owner_id == OG_INVALID_ID8) {
dcs_send_grant_owner(session, msg);
break;
}
if (owner_id == msg->head->src_inst) {
dcs_send_already_owner(session, msg);
break;
}
if (owner_id != session->kernel->id) {
dcs_route_pcr_request_owner(session, request, owner_id);
break;
}
local_route = OG_FALSE;
if (request->cr_type == CR_TYPE_HEAP) {
if (dcs_process_heap_pcr_construct(session, request, &local_route) != OG_SUCCESS) {
mes_send_error_msg(msg->head);
}
} else {
OG_THROW_ERROR(ERR_CAPABILITY_NOT_SUPPORT);
mes_send_error_msg(msg->head);
}
}
}
* requester would receive REQ_MASTER message sent from itself
*/
void dcs_process_pcr_req_master(void *sess, mes_message_t *msg)
{
knl_session_t *session = (knl_session_t *)sess;
if (msg->head->src_inst == session->kernel->id) {
msg->head->dst_sid = msg->head->src_sid;
mes_process_msg_ack(sess, msg);
return;
}
if (sizeof(msg_pcr_request_t) != msg->head->size) {
OG_LOG_RUN_ERR("process pcr req master is invalid, msg size %u.", msg->head->size);
mes_release_message_buf(msg->buffer);
return;
}
dcs_handle_pcr_req_master(session, msg);
mes_release_message_buf(msg->buffer);
}
* requester would not receive REQ_OWNER message sent from itself
*/
void dcs_process_pcr_req_owner(void *sess, mes_message_t *msg)
{
msg_pcr_request_t *request = (msg_pcr_request_t *)(msg->buffer);
bool32 local_route = OG_FALSE;
knl_session_t *session = (knl_session_t *)sess;
if (sizeof(msg_pcr_request_t) != msg->head->size) {
OG_LOG_RUN_ERR("pcr req owner is invalid, msg size %u.", msg->head->size);
mes_send_error_msg(msg->head);
mes_release_message_buf(msg->buffer);
return;
}
if (request->page_id.file >= INVALID_FILE_ID || (request->page_id.page == 0 && request->page_id.file == 0)) {
OG_LOG_RUN_ERR("[%u-%u] page_id invalid,", request->page_id.file, request->page_id.page);
mes_send_error_msg(&request->head);
mes_release_message_buf(msg->buffer);
return;
}
if (request->cr_type == CR_TYPE_HEAP) {
if (dcs_process_heap_pcr_construct(session, request, &local_route) != OG_SUCCESS) {
mes_send_error_msg(msg->head);
}
} else {
OG_THROW_ERROR(ERR_CAPABILITY_NOT_SUPPORT);
mes_send_error_msg(msg->head);
}
if (local_route) {
dcs_handle_pcr_req_master(session, msg);
}
mes_release_message_buf(msg->buffer);
}
static inline void dcs_init_check_cursor(cr_cursor_t *cr_cursor, msg_cr_check_t *check)
{
cr_cursor->rowid = check->rowid;
cr_cursor->xid = check->xid;
cr_cursor->wxid.value = OG_INVALID_ID64;
cr_cursor->query_scn = check->query_scn;
cr_cursor->ssn = check->ssn;
cr_cursor->ssi_conflict = OG_FALSE;
cr_cursor->cleanout = OG_FALSE;
cr_cursor->is_remote = OG_TRUE;
cr_cursor->local_cr = OG_FALSE;
}
status_t dcs_check_current_visible(knl_session_t *session, cr_cursor_t *cursor, char *page, uint8 dst_id,
bool32 *is_found)
{
msg_cr_check_t check;
mes_message_t message;
pcr_status_t status;
uint16 size = (uint16)(sizeof(msg_cr_check_t) + DEFAULT_PAGE_SIZE(session));
mes_init_send_head(&check.head, MES_CMD_CHECK_VISIBLE, size, OG_INVALID_ID32, session->kernel->id, dst_id,
session->id, OG_INVALID_ID16);
check.rowid = cursor->rowid;
check.xid = cursor->xid;
check.query_scn = cursor->query_scn;
check.ssn = cursor->ssn;
DTC_DCS_DEBUG_INF("[PCR][%u-%u-%u][check current visible] query_scn %llu query_ssn %u "
"src_inst %u src_sid %u dst_inst %u",
(uint32)check.rowid.file, (uint32)check.rowid.page, (uint32)check.rowid.slot, check.query_scn,
check.ssn, check.head.src_inst, check.head.src_sid, dst_id);
for (;;) {
knl_begin_session_wait(session, PCR_CHECK_CURR_VISIBLE, OG_TRUE);
status_t ret = OG_SUCCESS;
SYNC_POINT_GLOBAL_START(OGRAC_HEAP_CHECK_VISIBLE_SEND_FAIL, &ret, OG_ERROR);
ret = mes_send_data3(&check.head, sizeof(msg_cr_check_t), page);
SYNC_POINT_GLOBAL_END;
if (ret != OG_SUCCESS) {
knl_end_session_wait(session, PCR_CHECK_CURR_VISIBLE);
break;
}
if (mes_recv(session->id, &message, OG_TRUE, check.head.rsn, DCS_CR_REQ_TIMEOUT) != OG_SUCCESS) {
knl_end_session_wait(session, PCR_CHECK_CURR_VISIBLE);
break;
}
knl_end_session_wait(session, PCR_CHECK_CURR_VISIBLE);
if (dcs_pcr_process_message(session, &message, cursor, page, &status, is_found) != OG_SUCCESS) {
mes_release_message_buf(message.buffer);
return OG_ERROR;
}
mes_release_message_buf(message.buffer);
return OG_SUCCESS;
}
DTC_DCS_DEBUG_ERR("[PCR][%u-%u-%u][check current visible failed] query_scn %llu query_ssn %u "
"src_inst %u src_sid %u dst_inst %u",
(uint32)check.rowid.file, (uint32)check.rowid.page, (uint32)check.rowid.slot, check.query_scn,
check.ssn, check.head.src_inst, check.head.src_sid, dst_id);
cm_reset_error();
cm_sleep(MES_MSG_RETRY_TIME);
return OG_SUCCESS;
}
static inline status_t dcs_send_check_visible(knl_session_t *session, msg_cr_check_t *check, cr_cursor_t *cursor,
uint8 dst_id)
{
bool8 local_cr = g_dtc->profile.enable_rmo_cr || cursor->local_cr;
check->head.dst_inst = local_cr ? check->head.src_inst : dst_id;
check->local_cr = local_cr;
DTC_DCS_DEBUG_INF("[PCR][%u-%u-%u][send check visible] query_scn %llu query_ssn %u "
"src_inst %u src_sid %u dst_inst %u local_cr %u",
(uint32)check->rowid.file, (uint32)check->rowid.page, (uint32)check->rowid.slot, check->query_scn,
check->ssn, check->head.src_inst, check->head.src_sid, dst_id, check->local_cr);
if (mes_send_data(check) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t dcs_send_check_visible_ack(knl_session_t *session, msg_cr_check_t *check, bool32 is_found)
{
msg_cr_check_ack_t msg;
mes_init_ack_head(&check->head, &msg.head, MES_CMD_CHECK_VISIBLE_ACK, sizeof(msg_cr_check_ack_t), session->id);
msg.is_found = is_found;
DTC_DCS_DEBUG_INF("[PCR][%u-%u-%u][send check visible ack] is_found %u "
"src_inst %u src_sid %u dst_inst %u dst_sid %u",
(uint32)check->rowid.file, (uint32)check->rowid.page, (uint32)check->rowid.slot, (uint32)is_found,
session->kernel->id, session->id, check->head.src_inst, check->head.src_sid);
status_t ret = OG_SUCCESS;
SYNC_POINT_GLOBAL_START(OGRAC_HEAP_CHECK_VISIBLE_ACK_FAIL, &ret, OG_ERROR);
ret = mes_send_data(&msg);
SYNC_POINT_GLOBAL_END;
if (ret != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t dcs_heap_check_visible(knl_session_t *session, msg_cr_check_t *check)
{
heap_page_t *page = (heap_page_t *)((char *)check + sizeof(msg_cr_check_t));
cr_cursor_t cr_cursor;
bool32 is_found = OG_TRUE;
uint8 inst_id;
dcs_init_check_cursor(&cr_cursor, check);
for (;;) {
if (pcrh_fetch_invisible_itl(session, &cr_cursor, page) != OG_SUCCESS) {
return OG_ERROR;
}
if (cr_cursor.itl == NULL) {
return dcs_send_check_visible_ack(session, check, is_found);
}
inst_id = xid_get_inst_id(session, cr_cursor.itl->xid);
if (inst_id == session->kernel->id && !cr_cursor.local_cr) {
if (pcrh_chk_visible_with_undo_ss(session, &cr_cursor, page, OG_FALSE, &is_found) != OG_SUCCESS) {
return OG_ERROR;
}
if (!is_found) {
return dcs_send_check_visible_ack(session, check, is_found);
}
} else {
return dcs_send_check_visible(session, check, &cr_cursor, inst_id);
}
}
}
void dcs_process_check_visible(void *sess, mes_message_t *msg)
{
knl_session_t *session = (knl_session_t *)sess;
if (msg->head->src_inst == session->kernel->id) {
msg->head->dst_sid = msg->head->src_sid;
mes_process_msg_ack(sess, msg);
return;
}
if (sizeof(msg_cr_check_t) + DEFAULT_PAGE_SIZE(sess) != msg->head->size) {
OG_LOG_RUN_ERR("process check visible msg size is invalid, msg size %u.", msg->head->size);
mes_send_error_msg(msg->head);
mes_release_message_buf(msg->buffer);
return;
}
if (dcs_heap_check_visible(session, (msg_cr_check_t *)(msg->buffer)) != OG_SUCCESS) {
mes_send_error_msg(msg->head);
}
mes_release_message_buf(msg->buffer);
}
void dcs_buf_clean_ctrl_edp(knl_session_t *session, buf_ctrl_t *ctrl, bool32 need_lock)
{
ckpt_context_t *ogx = &session->kernel->ckpt_ctx;
buf_set_t *set = &session->kernel->buf_ctx.buf_set[ctrl->buf_pool_id];
buf_bucket_t *bucket = BUF_GET_BUCKET(set, ctrl->bucket_id);
if (need_lock) {
cm_spin_lock(&bucket->lock, &session->stat->spin_stat.stat_bucket);
}
if (!ctrl->is_edp) {
DTC_DCS_DEBUG_INF("[DCS] edp page[%u-%u] (lsn:%lld) has already been cleaned.", ctrl->page_id.file,
ctrl->page_id.page, ctrl->page->lsn);
if (need_lock) {
cm_spin_unlock(&bucket->lock);
}
return;
}
knl_panic(ctrl && ctrl->is_dirty && ctrl->is_edp && !ctrl->is_readonly);
knl_panic(DCS_BUF_CTRL_NOT_OWNER(session, ctrl));
if (ctrl == ogx->batch_end) {
ogx->batch_end = ogx->batch_end->ckpt_prev;
}
ckpt_pop_page(session, ogx, ctrl);
CM_MFENCE;
ctrl->is_dirty = 0;
ctrl->is_remote_dirty = 0;
ctrl->is_edp = 0;
ctrl->edp_map = 0;
if (need_lock) {
cm_spin_unlock(&bucket->lock);
}
}
static void dcs_send_arch_set_ack(mes_message_t *msg, status_t status)
{
mes_message_head_t head = { 0 };
mes_init_ack_head(msg->head, &head, MES_CMD_BROADCAST_ACK, sizeof(mes_message_head_t), OG_INVALID_ID16);
head.status = (int8)status;
mes_release_message_buf(msg->buffer);
if (mes_send_data(&head) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DCS] failed to send arch set ack, status %d", status);
}
}
status_t dcs_alter_set_param(knl_session_t *session, const char *param, const char *value, config_scope_t scope)
{
msg_arch_set_request_t req = { 0 };
errno_t err = EOK;
CM_POINTER3(session, param, value);
req.scope = scope;
err = strcpy_s(req.param, OG_NAME_BUFFER_SIZE, param);
if (err != EOK) {
OG_THROW_ERROR(ERR_SYSTEM_CALL, err);
return OG_ERROR;
}
err = strcpy_s(req.value, OG_PARAM_BUFFER_SIZE, value);
if (err != EOK) {
OG_THROW_ERROR(ERR_SYSTEM_CALL, err);
return OG_ERROR;
}
OG_LOG_RUN_INF("[DCS] request to arch set params, param %s, scope %u, value %s", req.param, req.scope, req.value);
mes_init_send_head(&req.head, MES_CMD_ARCH_SET_REQ, sizeof(msg_arch_set_request_t), OG_INVALID_ID32,
DCS_SELF_INSTID(session), 0, session->id, OG_INVALID_ID16);
if (mes_broadcast_and_wait(session->id, MES_BROADCAST_ALL_INST, (void *)&req, MES_WAIT_MAX_TIME, NULL) !=
OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
void dcs_process_arch_set_request(void *sess, mes_message_t *msg)
{
OG_LOG_RUN_INF("[DCS] process request to arch set params");
knl_session_t *session = (knl_session_t *)sess;
msg_arch_set_request_t *req = (msg_arch_set_request_t *)msg->buffer;
status_t status = OG_SUCCESS;
database_t *db = &session->kernel->db;
config_item_t *item = NULL;
bool32 force = OG_TRUE;
if (db->status != DB_STATUS_MOUNT && db->status != DB_STATUS_OPEN) {
OG_THROW_ERROR_EX(ERR_SQL_SYNTAX_ERROR, "set param only work in mount or open state");
status = OG_ERROR;
goto send_ack;
}
text_t name = { .str = req->param, .len = (uint32)strlen(req->param) };
item = cm_get_config_item(GET_CONFIG, &name, OG_TRUE);
if (item == NULL) {
OG_THROW_ERROR(ERR_INVALID_PARAMETER_NAME, req->param);
status = OG_ERROR;
goto send_ack;
}
if (req->scope != CONFIG_SCOPE_DISK) {
if (item->notify && item->notify((knl_handle_t)session, (void *)item, req->value)) {
status = OG_ERROR;
goto send_ack;
}
} else {
if (item->notify_pfile && item->notify_pfile((knl_handle_t)session, (void *)item, req->value)) {
status = OG_ERROR;
goto send_ack;
}
}
if (item->attr & ATTR_READONLY) {
#if defined(_DEBUG) || defined(DEBUG) || defined(DB_DEBUG_VERSION)
force = OG_TRUE;
#else
force = OG_FALSE;
#endif
}
if (cm_alter_config(session->kernel->attr.config, item->name, req->value, req->scope, force) != OG_SUCCESS) {
status = OG_ERROR;
goto send_ack;
}
OG_LOG_RUN_INF("[DCS] done request to arch set params");
send_ack:
dcs_send_arch_set_ack(msg, status);
return;
}