* Copyright (c) 2022 Huawei Technologies Co.,Ltd.
*
* DSS is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* dss_mes.c
*
*
* IDENTIFICATION
* src/service/dss_mes.c
*
* -------------------------------------------------------------------------
*/
#include "cm_types.h"
#include "cm_error.h"
#include "dss_malloc.h"
#include "dss_session.h"
#include "dss_file.h"
#include "dss_service.h"
#include "dss_instance.h"
#include "dss_api.h"
#include "dss_mes.h"
#include "dss_syn_meta.h"
#include "dss_thv.h"
#include "dss_fault_injection.h"
#include "dss_param_verify.h"
void dss_proc_broadcast_req(dss_session_t *session, mes_msg_t *msg);
void dss_proc_syb2active_req(dss_session_t *session, mes_msg_t *msg);
void dss_proc_loaddisk_req(dss_session_t *session, mes_msg_t *msg);
void dss_proc_join_cluster_req(dss_session_t *session, mes_msg_t *msg);
void dss_proc_refresh_ft_by_primary_req(dss_session_t *session, mes_msg_t *msg);
void dss_proc_get_ft_block_req(dss_session_t *session, mes_msg_t *msg);
void dss_proc_normal_ack(dss_session_t *session, mes_msg_t *msg)
{
dss_message_head_t *dss_head = (dss_message_head_t *)msg->buffer;
LOG_DEBUG_INF("[MES] Receive ack(%u),src inst(%u), dst inst(%u).", (uint32)(dss_head->dss_cmd),
(uint32)(dss_head->src_inst), (uint32)(dss_head->dst_inst));
}
dss_processor_t g_dss_processors[DSS_CMD_CEIL] = {
[DSS_CMD_REQ_BROADCAST] = {dss_proc_broadcast_req, CM_TRUE, CM_TRUE, MES_PRIORITY_ONE, "dss broadcast"},
[DSS_CMD_ACK_BROADCAST_WITH_MSG] = {dss_proc_normal_ack, CM_FALSE, CM_FALSE, MES_PRIORITY_ONE,
"dss broadcast ack with data"},
[DSS_CMD_REQ_SYB2ACTIVE] = {dss_proc_syb2active_req, CM_TRUE, CM_TRUE, MES_PRIORITY_ONE,
"dss standby to active req"},
[DSS_CMD_ACK_SYB2ACTIVE] = {dss_proc_normal_ack, CM_FALSE, CM_FALSE, MES_PRIORITY_ONE, "dss active to standby ack"},
[DSS_CMD_REQ_LOAD_DISK] = {dss_proc_loaddisk_req, CM_TRUE, CM_TRUE, MES_PRIORITY_ZERO,
"dss standby load disk to active req"},
[DSS_CMD_ACK_LOAD_DISK] = {dss_proc_normal_ack, CM_FALSE, CM_FALSE, MES_PRIORITY_ZERO,
"dss active load disk to standby ack"},
[DSS_CMD_REQ_JOIN_CLUSTER] = {dss_proc_join_cluster_req, CM_TRUE, CM_TRUE, MES_PRIORITY_ONE,
"dss standby join in cluster to active req"},
[DSS_CMD_ACK_JOIN_CLUSTER] = {dss_proc_normal_ack, CM_FALSE, CM_FALSE, MES_PRIORITY_ONE,
"dss active proc join in cluster to standby ack"},
[DSS_CMD_REQ_REFRESH_FT] = {dss_proc_refresh_ft_by_primary_req, CM_TRUE, CM_TRUE, MES_PRIORITY_ONE,
"dss standby refresh ft by primary req"},
[DSS_CMD_ACK_REFRESH_FT] = {dss_proc_normal_ack, CM_FALSE, CM_FALSE, MES_PRIORITY_ONE,
"dss active proc refresh ft to standby ack"},
[DSS_CMD_REQ_GET_FT_BLOCK] = {dss_proc_get_ft_block_req, CM_TRUE, CM_TRUE, MES_PRIORITY_ZERO,
"dss standby get ft block req"},
[DSS_CMD_ACK_GET_FT_BLOCK] = {dss_proc_normal_ack, CM_FALSE, CM_FALSE, MES_PRIORITY_ZERO,
"dss active proc get ft block ack"},
};
static inline mes_priority_t dss_get_cmd_prio_id(dss_mes_command_t cmd)
{
return g_dss_processors[cmd].prio_id;
}
typedef void (*dss_remote_ack_proc)(dss_session_t *session, dss_remote_exec_succ_ack_t *remote_ack);
typedef struct st_dss_remote_ack_hdl {
dss_remote_ack_proc proc;
} dss_remote_ack_hdl_t;
void dss_process_remote_ack_for_get_ftid_by_path(dss_session_t *session, dss_remote_exec_succ_ack_t *remote_ack)
{
dss_find_node_t *ft_node = (dss_find_node_t *)(remote_ack->body_buf + sizeof(uint32));
dss_vg_info_item_t *vg_item = dss_find_vg_item(ft_node->vg_name);
(void)dss_get_ft_node_by_ftid(session, vg_item, ft_node->ftid, CM_TRUE, CM_FALSE);
}
static dss_remote_ack_hdl_t g_dss_remote_ack_handle[DSS_CMD_TYPE_OFFSET(DSS_CMD_END)] = {
[DSS_CMD_TYPE_OFFSET(DSS_CMD_GET_FTID_BY_PATH)] = {dss_process_remote_ack_for_get_ftid_by_path},
};
static inline dss_remote_ack_hdl_t *dss_get_remote_ack_handle(int32 cmd)
{
if (cmd >= DSS_CMD_BEGIN && cmd < DSS_CMD_END) {
return &g_dss_remote_ack_handle[DSS_CMD_TYPE_OFFSET(cmd)];
}
return NULL;
}
static inline void dss_set_ack_common(dss_bcast_context_t *bcast_ctx, bool32 ret_val)
{
bcast_ctx->ack_len = sizeof(dss_ack_common_t);
dss_ack_common_t *ack = (dss_ack_common_t *)bcast_ctx->ack_msg;
ack->cmd_ack = ret_val;
}
status_t dss_process_check_open_file(dss_session_t *session, dss_bcast_context_t *bcast_ctx)
{
if (bcast_ctx->req_len < sizeof(dss_req_check_open_file_t)) {
LOG_RUN_ERR("[MES] invalid message req size %u", bcast_ctx->req_len);
return CM_ERROR;
}
dss_req_check_open_file_t *req = (dss_req_check_open_file_t *)bcast_ctx->req_msg;
bool32 check_ret = CM_FALSE;
status_t ret = dss_check_open_file_remote(session, req->vg_name, req->ftid, &check_ret);
if (ret != CM_SUCCESS) {
return ret;
}
dss_set_ack_common(bcast_ctx, check_ret);
return CM_SUCCESS;
}
status_t dss_process_invalidate_meta(dss_session_t *session, dss_bcast_context_t *bcast_ctx)
{
dss_req_meta_data_t *req_ex = (dss_req_meta_data_t *)bcast_ctx->req_msg;
bool32 invalidate_ret = CM_FALSE;
status_t ret = dss_invalidate_meta_remote(
session, (dss_invalidate_meta_msg_t *)req_ex->data, req_ex->data_size, &invalidate_ret);
if (ret != CM_SUCCESS) {
return ret;
}
dss_set_ack_common(bcast_ctx, invalidate_ret);
return CM_SUCCESS;
}
status_t dss_process_sync_meta(dss_session_t *session, dss_bcast_context_t *bcast_ctx)
{
dss_req_meta_data_t *req_ex = (dss_req_meta_data_t *)bcast_ctx->req_msg;
bool32 sync_ret = CM_FALSE;
status_t ret = dss_meta_syn_remote(session, (dss_meta_syn_t *)req_ex->data, req_ex->data_size, &sync_ret);
if (ret != CM_SUCCESS) {
return ret;
}
dss_set_ack_common(bcast_ctx, sync_ret);
return CM_SUCCESS;
}
status_t dss_process_get_version(dss_session_t *session, dss_bcast_context_t *bcast_ctx)
{
if (bcast_ctx->req_len < sizeof(dss_req_common_t)) {
LOG_RUN_ERR("[MES] invalid message req size %u", bcast_ctx->req_len);
return CM_ERROR;
}
bcast_ctx->ack_len = sizeof(dss_ack_get_version_t);
dss_ack_get_version_t *ack = (dss_ack_get_version_t *)bcast_ctx->ack_msg;
ack->version = DSS_PROTO_VERSION;
return CM_SUCCESS;
}
status_t dss_process_ack_check_open_file(dss_bcast_ack_head_t *ack_head, void *ack_msg_output)
{
if (ack_head->dss_head.size < sizeof(dss_ack_common_t)) {
DSS_THROW_ERROR(ERR_DSS_MES_ILL, "msg len is invalid");
return ERR_DSS_MES_ILL;
}
dss_ack_common_t *ack = (dss_ack_common_t *)ack_head;
if (ack->result != CM_SUCCESS) {
DSS_THROW_ERROR(ERR_DSS_FILE_OPENING_REMOTE, ack_head->dss_head.src_inst, ack_head->dss_head.dss_cmd);
return ack->result;
}
dss_bcast_ack_bool_t *ack_bool = (dss_bcast_ack_bool_t *)ack_msg_output;
if (ack_bool->default_ack != ack->cmd_ack) {
ack_bool->cmd_ack = ack->cmd_ack;
}
return CM_SUCCESS;
}
status_t dss_process_ack_invalidate_meta(dss_bcast_ack_head_t *ack_head, void *ack_msg_output)
{
if (ack_head->dss_head.size < sizeof(dss_ack_common_t)) {
DSS_THROW_ERROR(ERR_DSS_MES_ILL, "msg len is invalid");
return ERR_DSS_MES_ILL;
}
dss_ack_common_t *ack = (dss_ack_common_t *)ack_head;
dss_bcast_ack_bool_t *ack_bool = (dss_bcast_ack_bool_t *)ack_msg_output;
if (ack->result != CM_SUCCESS) {
return ack->result;
}
if (ack_bool->default_ack != ack->cmd_ack) {
ack_bool->cmd_ack = ack->cmd_ack;
}
return CM_SUCCESS;
}
status_t dss_process_ack_get_version(dss_bcast_ack_head_t *ack_head, void *ack_msg_output)
{
if (ack_head->dss_head.size < sizeof(dss_ack_get_version_t)) {
DSS_THROW_ERROR(ERR_DSS_MES_ILL, "msg len is invalid");
return ERR_DSS_MES_ILL;
}
dss_ack_get_version_t *ack = (dss_ack_get_version_t *)ack_head;
if (ack->bcast_head.dss_head.result != CM_SUCCESS) {
return ack->bcast_head.dss_head.result;
}
dss_get_version_output_t *get_version_output = (dss_get_version_output_t *)ack_msg_output;
if (ack->version != DSS_PROTO_VERSION) {
get_version_output->all_same = DSS_FALSE;
}
if (ack->version < get_version_output->min_version) {
get_version_output->min_version = ack->version;
}
return CM_SUCCESS;
}
typedef status_t (*dss_bcast_proc_func)(dss_session_t *session, dss_bcast_context_t *bcast_ctx);
typedef status_t (*dss_bcast_ack_proc_func)(dss_bcast_ack_head_t *ack_head, void *ack_msg_output);
typedef struct st_dss_bcast_hdl {
dss_bcast_proc_func proc_func;
bool32 need_ack;
dss_bcast_ack_cmd_t ack_cmd;
} dss_bcast_hdl_t;
typedef struct st_dss_bcast_ack_hdl {
dss_bcast_ack_proc_func proc_func;
} dss_bcast_ack_hdl_t;
static dss_bcast_hdl_t g_dss_bcast_handle[BCAST_REQ_END] = {
[BCAST_REQ_DEL_DIR_FILE] = {dss_process_check_open_file, DSS_TRUE, BCAST_ACK_DEL_FILE},
[BCAST_REQ_INVALIDATE_META] = {dss_process_invalidate_meta, DSS_TRUE, BCAST_ACK_INVALIDATE_META},
[BCAST_REQ_META_SYN] = {dss_process_sync_meta, DSS_FALSE, BCAST_ACK_END},
[BCAST_REQ_GET_VERSION] = {dss_process_get_version, DSS_TRUE, BCAST_ACK_GET_VERSION},
};
static dss_bcast_ack_hdl_t g_dss_bcast_ack_handle[BCAST_ACK_END] = {
[BCAST_ACK_DEL_FILE] = {dss_process_ack_check_open_file},
[BCAST_ACK_INVALIDATE_META] = {dss_process_ack_invalidate_meta},
[BCAST_ACK_GET_VERSION] = {dss_process_ack_get_version},
};
static void dss_init_mes_head(dss_message_head_t *head, uint32 cmd, uint32 flags, uint16 src_inst, uint16 dst_inst,
uint32 size, uint32 version, ruid_type ruid)
{
(void)memset_s(head, DSS_MES_MSG_HEAD_SIZE, 0, DSS_MES_MSG_HEAD_SIZE);
head->sw_proto_ver = DSS_PROTO_VERSION;
head->msg_proto_ver = version;
head->size = size;
head->dss_cmd = cmd;
head->ruid = ruid;
head->src_inst = src_inst;
head->dst_inst = dst_inst;
head->flags = flags | dss_get_cmd_prio_id(cmd);
}
static inline dss_bcast_ack_cmd_t dss_get_bcast_ack_cmd(dss_bcast_req_cmd_t bcast_op)
{
if (bcast_op >= ELEMENT_COUNT(g_dss_bcast_handle)) {
LOG_RUN_ERR("Invalid broadcast request type");
return BCAST_ACK_END;
}
return g_dss_bcast_handle[bcast_op].ack_cmd;
}
static inline bool32 dss_bcast_need_ack(dss_bcast_req_cmd_t bcast_op)
{
DSS_ASSERT_LOG(bcast_op < ELEMENT_COUNT(g_dss_bcast_handle), "invalid bcast cmd %u", bcast_op);
return g_dss_bcast_handle[bcast_op].need_ack;
}
static void dss_send_bcast_ack(dss_bcast_context_t *bcast_ctx, status_t result)
{
dss_bcast_req_head_t *req_head = (dss_bcast_req_head_t *)bcast_ctx->req_msg;
uint16 dst_inst = req_head->dss_head.src_inst;
uint16 src_inst = req_head->dss_head.dst_inst;
uint32 version = req_head->dss_head.msg_proto_ver;
ruid_type ruid = req_head->dss_head.ruid;
dss_bcast_ack_head_t *ack_head = (dss_bcast_ack_head_t *)bcast_ctx->ack_msg;
dss_init_mes_head(
&ack_head->dss_head, DSS_CMD_ACK_BROADCAST_WITH_MSG, 0, src_inst, dst_inst, bcast_ctx->ack_len, version, ruid);
ack_head->type = dss_get_bcast_ack_cmd(req_head->type);
ack_head->dss_head.result = result;
int ret = mes_send_response(dst_inst, ack_head->dss_head.flags, ruid, bcast_ctx->ack_msg, bcast_ctx->ack_len);
if (ret != CM_SUCCESS) {
LOG_DEBUG_ERR("[MES] send message failed, src inst(%hhu), dst inst(%hhu) ret(%d) ", src_inst, dst_inst, ret);
return;
}
DSS_LOG_DEBUG_OP("[MES] Succeed to send message, result: %u. cmd=%u, src_inst=%hhu, dst_inst=%hhu.", result,
ack_head->type, ack_head->dss_head.src_inst, ack_head->dss_head.dst_inst);
}
static void dss_send_bcast_ack_common(dss_bcast_context_t *bcast_ctx, status_t ret)
{
bcast_ctx->ack_len = sizeof(dss_ack_common_t);
dss_send_bcast_ack(bcast_ctx, ret);
}
int32 dss_proc_broadcast_ack_single(dss_bcast_ack_head_t *ack_head, void *ack_msg_output)
{
if (ack_head->type >= ELEMENT_COUNT(g_dss_bcast_ack_handle)) {
return ERR_DSS_UNSUPPORTED_CMD;
}
dss_bcast_ack_hdl_t *ack_handler = &g_dss_bcast_ack_handle[ack_head->type];
if (ack_handler->proc_func == NULL) {
return ERR_DSS_UNSUPPORTED_CMD;
}
return ack_handler->proc_func(ack_head, ack_msg_output);
}
static void dss_ack_version_not_match(dss_session_t *session, dss_message_head_t *req_head, uint32 version)
{
dss_config_t *inst_cfg = dss_get_inst_cfg();
dss_params_t *param = &inst_cfg->params;
uint16 dst_inst = req_head->src_inst;
uint16 src_inst = (uint16)param->inst_id;
ruid_type ruid = req_head->ruid;
dss_message_head_t ack_head;
uint32 cmd = (req_head->dss_cmd == DSS_CMD_REQ_BROADCAST) ? DSS_CMD_ACK_BROADCAST_WITH_MSG : DSS_CMD_ACK_SYB2ACTIVE;
dss_init_mes_head(&ack_head, cmd, 0, src_inst, dst_inst, DSS_MES_MSG_HEAD_SIZE, version, ruid);
ack_head.result = ERR_DSS_VERSION_NOT_MATCH;
int ret = mes_send_response(dst_inst, ack_head.flags, ruid, (char *)&ack_head, DSS_MES_MSG_HEAD_SIZE);
if (ret != CM_SUCCESS) {
LOG_DEBUG_ERR(
"send version not match message failed, src inst(%hhu), dst inst(%hhu) ret(%d)", src_inst, dst_inst, ret);
return;
}
LOG_RUN_INF("send version not match message succeed, src inst(%hhu), dst inst(%hhu), ack msg version (%hhu)",
src_inst, dst_inst, version);
}
#define DSS_BCAST_ACK_MSG_LEN ((int)128)
void dss_proc_broadcast_req(dss_session_t *session, mes_msg_t *msg)
{
if (dss_need_exec_local()) {
LOG_RUN_WAR("No need to solve broadcast msg when the current node is master.");
return;
}
if (msg->size < sizeof(dss_bcast_req_head_t)) {
LOG_RUN_ERR("[MES] invalid message req size %u", msg->size);
return;
}
dss_bcast_req_head_t *req_head = (dss_bcast_req_head_t *)msg->buffer;
char ack_msg[DSS_BCAST_ACK_MSG_LEN] = {0};
dss_bcast_context_t bcast_ctx = {.req_msg = msg->buffer, .req_len = msg->size, .ack_msg = ack_msg, .ack_len = 0};
if (req_head->type >= ELEMENT_COUNT(g_dss_bcast_handle)) {
dss_send_bcast_ack_common(&bcast_ctx, ERR_DSS_UNSUPPORTED_CMD);
return;
}
dss_bcast_hdl_t *boc_handler = &g_dss_bcast_handle[req_head->type];
if (boc_handler->proc_func == NULL) {
dss_send_bcast_ack_common(&bcast_ctx, ERR_DSS_UNSUPPORTED_CMD);
return;
}
status_t ret = boc_handler->proc_func(session, &bcast_ctx);
if (boc_handler->need_ack) {
dss_send_bcast_ack(&bcast_ctx, ret);
}
}
static void dss_set_cluster_proto_vers(uint8 inst_id, uint32 version)
{
if (inst_id >= DSS_MAX_INSTANCES) {
LOG_DEBUG_ERR("Invalid request inst_id:%hhu, version is %u.", inst_id, version);
return;
}
bool32 set_flag = CM_FALSE;
do {
uint32 cur_version = (uint32)cm_atomic32_get((atomic32_t *)&g_dss_instance.cluster_proto_vers[inst_id]);
if (cur_version == version) {
break;
}
set_flag = cm_atomic32_cas(
(atomic32_t *)&g_dss_instance.cluster_proto_vers[inst_id], (int32)cur_version, (int32)version);
} while (!set_flag);
}
static int dss_proc_broadcast_ack_inner(
mes_msg_list_t *responses, dss_bcast_community_t *community, void *ack_msg_output)
{
int ret;
dss_bcast_ack_head_t *ack_head;
uint32 src_inst;
for (uint32 i = 0; i < responses->count; i++) {
mes_msg_t *msg = &responses->messages[i];
ack_head = (dss_bcast_ack_head_t *)msg->buffer;
src_inst = responses->messages[i].src_inst;
dss_set_cluster_proto_vers((uint8)src_inst, ack_head->dss_head.sw_proto_ver);
if (ack_head->dss_head.result == ERR_DSS_VERSION_NOT_MATCH) {
community->version_not_match_inst |= ((uint64)0x1 << src_inst);
continue;
}
if (ack_head->dss_head.size < sizeof(dss_bcast_ack_head_t)) {
DSS_THROW_ERROR(ERR_DSS_MES_ILL, "msg len is invalid");
return ERR_DSS_MES_ILL;
}
ret = dss_proc_broadcast_ack_single(ack_head, ack_msg_output);
DSS_RETURN_IF_ERROR(ret);
}
return DSS_SUCCESS;
}
static void dss_release_broadcast_msg(mes_msg_list_t *responses)
{
for (uint32 i = 0; i < responses->count; i++) {
mes_release_msg(&responses->messages[i]);
}
}
static int dss_proc_broadcast_ack(
ruid_type ruid, dss_bcast_community_t *community, uint64 *succ_ack_inst, void *ack_msg_output)
{
mes_msg_list_t responses;
int ret = mes_broadcast_get_response(ruid, &responses, community->timeout);
if (ret != DSS_SUCCESS) {
LOG_DEBUG_INF("[MES] Try broadcast get response failed, ret is %d, ruid is %llu.", ret, ruid);
return ret;
}
ret = dss_proc_broadcast_ack_inner(&responses, community, ack_msg_output);
if (ret != DSS_SUCCESS) {
dss_release_broadcast_msg(&responses);
LOG_DEBUG_INF("[MES] Try broadcast get response failed, ret is %d, ruid is %llu.", ret, ruid);
return ret;
}
for (uint32 i = 0; i < responses.count; i++) {
uint32 src_inst = responses.messages[i].src_inst;
*succ_ack_inst |= ((uint64)0x1 << src_inst);
}
*succ_ack_inst = *succ_ack_inst & (~community->version_not_match_inst);
dss_release_broadcast_msg(&responses);
return ret;
}
static void dss_handle_discard_recv_broadcast_msg(ruid_type ruid)
{
mes_msg_list_t responses;
int ret = mes_broadcast_get_response(ruid, &responses, 0);
if (ret == CM_SUCCESS) {
dss_release_broadcast_msg(&responses);
}
}
uint32 dss_get_broadcast_proto_ver(uint64 succ_inst)
{
uint64 inst_mask;
uint64 cur_work_inst_map = dss_get_inst_work_status();
uint64 need_send_inst = (~succ_inst & cur_work_inst_map);
uint32 inst_proto_ver;
uint32 broadcast_proto_vers = DSS_PROTO_VERSION;
for (uint32 i = 0; i < DSS_MAX_INSTANCES; i++) {
inst_mask = ((uint64)0x1 << i);
if ((need_send_inst & inst_mask) == 0) {
continue;
}
inst_proto_ver = (uint32)cm_atomic32_get((atomic32_t *)&g_dss_instance.cluster_proto_vers[i]);
if (inst_proto_ver == DSS_INVALID_VERSION) {
continue;
}
broadcast_proto_vers = MIN(broadcast_proto_vers, inst_proto_ver);
}
return broadcast_proto_vers;
}
void dss_get_valid_inst(uint64 valid_inst, uint32 *arr, uint32 count)
{
uint32 i = 0;
for (uint32 j = 0; j < DSS_MAX_INSTANCES; j++) {
if (DSS_IS_INST_SEND(valid_inst, j)) {
arr[i] = j;
i++;
}
}
}
#define DSS_BROADCAST_MSG_TRY_MAX 5
#define DSS_BROADCAST_MSG_TRY_SLEEP_TIME 200
static status_t dss_broadcast_msg(dss_bcast_req_head_t *req, dss_bcast_community_t *community, void *ack_msg_output)
{
int32 ret = DSS_SUCCESS;
dss_config_t *inst_cfg = dss_get_inst_cfg();
dss_params_t *param = &inst_cfg->params;
uint64 succ_req_inst = 0;
uint64 succ_ack_inst = 0;
uint32 i = 0;
uint64 cur_work_inst_map = dss_get_inst_work_status();
uint64 snd_err_inst_map = (~community->succ_inst & cur_work_inst_map);
uint64 last_inst_inst_map = 0;
uint64 new_added_inst_map = 0;
uint64 valid_inst = 0;
uint64 valid_inst_mask = 0;
do {
cm_reset_error();
valid_inst_mask = ((cur_work_inst_map & snd_err_inst_map) | new_added_inst_map);
valid_inst = (param->nodes_list.inst_map) & (~((uint64)0x1 << (uint64)(param->inst_id))) & valid_inst_mask;
valid_inst = (~community->version_not_match_inst & valid_inst);
if (valid_inst == 0) {
if (community->version_not_match_inst != 0) {
community->version_not_match_inst = 0;
return ERR_DSS_VERSION_NOT_MATCH;
}
LOG_DEBUG_INF("[MES] No inst need to broadcast.");
return CM_SUCCESS;
}
LOG_DEBUG_INF("[MES] Try broadcast num is %u, head cmd is %u.", i, req->type);
uint32 count = cm_bitmap64_count(valid_inst);
uint32 valid_inst_arr[DSS_MAX_INSTANCES] = {0};
dss_get_valid_inst(valid_inst, valid_inst_arr, count);
(void)mes_broadcast_request_sp((inst_type *)valid_inst_arr, count, req->dss_head.flags, &req->dss_head.ruid,
(char *)req, req->dss_head.size);
succ_req_inst = valid_inst;
if (dss_bcast_need_ack(req->type)) {
ret = dss_proc_broadcast_ack(req->dss_head.ruid, community, &succ_ack_inst, ack_msg_output);
} else {
dss_handle_discard_recv_broadcast_msg(req->dss_head.ruid);
ret = CM_SUCCESS;
succ_ack_inst = succ_req_inst;
}
uint64 succ_inst = valid_inst & succ_ack_inst;
LOG_DEBUG_INF(
"[MES] Try broadcast num is %u, valid_inst is %llu, succ_inst is %llu.", i, valid_inst, succ_inst);
if (succ_inst != 0) {
community->succ_inst = community->succ_inst | succ_inst;
}
if (ret == CM_SUCCESS && succ_req_inst == succ_ack_inst) {
if (community->version_not_match_inst != 0) {
community->version_not_match_inst = 0;
return ERR_DSS_VERSION_NOT_MATCH;
}
return ret;
}
snd_err_inst_map = valid_inst_mask & (~(succ_req_inst & succ_ack_inst));
last_inst_inst_map = cur_work_inst_map;
cur_work_inst_map = dss_get_inst_work_status();
new_added_inst_map = (~last_inst_inst_map & cur_work_inst_map);
cm_sleep(DSS_BROADCAST_MSG_TRY_SLEEP_TIME);
i++;
} while (i < DSS_BROADCAST_MSG_TRY_MAX);
if (ret != ERR_DSS_UNSUPPORTED_CMD) {
if (snd_err_inst_map != 0) {
cm_sleep(param->mes_wait_timeout);
uint64 online_inst_map = dss_get_inst_work_status();
uint64 still_fail_inst_map = snd_err_inst_map & online_inst_map;
if (still_fail_inst_map == 0) {
LOG_RUN_INF("[MES] broadcast failed inst is offline after recheck, ignore broadcast error.");
cm_reset_error();
return CM_SUCCESS;
}
}
cm_reset_error();
DSS_THROW_ERROR(ERR_DSS_MES_ILL, "Failed to broadcast msg with try.");
}
LOG_RUN_ERR("[DSS] THROW UP ERROR WHEN BROADCAST FAILED, errcode:%d", cm_get_error_code());
return ret;
}
static bool32 dss_check_srv_status(mes_msg_t *msg)
{
dss_message_head_t *dss_head = (dss_message_head_t *)(msg->buffer);
if (g_dss_instance.status != DSS_STATUS_OPEN && dss_head->dss_cmd != DSS_CMD_ACK_JOIN_CLUSTER) {
LOG_DEBUG_INF("[MES] Could not exec remote req for the dssserver is not open or msg not join cluster, src "
"node:%u, wait try again.",
(uint32)(dss_head->src_inst));
return CM_FALSE;
}
return CM_TRUE;
}
static status_t dss_prepare_ack_msg(
dss_session_t *session, status_t ret, char **ack_buf, uint32 *ack_size, uint32 version)
{
int32 code;
const char *message = NULL;
dss_packet_t *send_pack = &session->send_pack;
if (ret != CM_SUCCESS) {
dss_init_set(send_pack, version);
*ack_buf = DSS_WRITE_ADDR(send_pack);
cm_get_error(&code, &message);
CM_RETURN_IFERR(dss_put_int32(send_pack, code));
CM_RETURN_IFERR(dss_put_str(send_pack, message));
} else {
*ack_buf = send_pack->buf + sizeof(dss_packet_head_t);
}
*ack_size = send_pack->head->size - sizeof(dss_packet_head_t);
return CM_SUCCESS;
}
void dss_proc_remote_req_err(dss_session_t *session, dss_message_head_t *req_dss_head, unsigned char cmd, int32 ret)
{
dss_message_head_t ack;
char *ack_buf = NULL;
uint32 ack_size = 0;
status_t status = dss_prepare_ack_msg(session, ret, &ack_buf, &ack_size, req_dss_head->msg_proto_ver);
if (status != CM_SUCCESS) {
LOG_DEBUG_ERR("The dssserver prepare ack msg failed, src node:%u, dst node:%u.", req_dss_head->src_inst,
req_dss_head->dst_inst);
return;
}
uint16 src_inst = req_dss_head->dst_inst;
uint16 dst_inst = req_dss_head->src_inst;
ruid_type ruid = req_dss_head->ruid;
uint32 version = req_dss_head->msg_proto_ver;
dss_init_mes_head(&ack, cmd, 0, src_inst, dst_inst, ack_size + DSS_MES_MSG_HEAD_SIZE, version, ruid);
ack.result = ret;
(void)mes_send_response_x(dst_inst, ack.flags, ruid, 2, &ack, DSS_MES_MSG_HEAD_SIZE, ack_buf, ack_size);
}
static status_t dss_process_remote_req_prepare(dss_session_t *session, mes_msg_t *msg, dss_processor_t *processor)
{
dss_message_head_t *dss_head = (dss_message_head_t *)msg->buffer;
dss_check_peer_by_inst(&g_dss_instance, dss_head->src_inst);
if (dss_head->dss_cmd != DSS_CMD_REQ_BROADCAST &&
(!dss_need_exec_local() || get_instance_status_proc() != DSS_STATUS_OPEN)) {
LOG_RUN_ERR("Proc msg cmd:%u from remote node:%u fail, can NOT exec here.", (uint32)dss_head->dss_cmd,
dss_head->src_inst);
return CM_ERROR;
}
if (dss_check_srv_status(msg) != CM_TRUE) {
LOG_RUN_WAR("Proc msg cmd:%u from remote node:%u fail, local status %u not open, wait try again.",
(uint32)dss_head->dss_cmd, dss_head->src_inst, g_dss_instance.status);
return CM_ERROR;
}
return CM_SUCCESS;
}
static status_t dss_process_remote_ack_prepare(dss_session_t *session, mes_msg_t *msg, dss_processor_t *processor)
{
if (dss_check_srv_status(msg) != CM_TRUE) {
dss_message_head_t *dss_head = (dss_message_head_t *)msg->buffer;
LOG_RUN_WAR("Proc msg cmd:%u from remote node:%u fail, local status %u not open, wait try again.",
(uint32)dss_head->dss_cmd, dss_head->src_inst, g_dss_instance.status);
return CM_ERROR;
}
return CM_SUCCESS;
}
static void dss_process_message(uint32 work_idx, ruid_type ruid, mes_msg_t *msg)
{
cm_reset_error();
DDES_FAULT_INJECTION_ACTION_TRIGGER_CUSTOM(
DSS_FI_MES_PROC_ENTER, cm_sleep(ddes_fi_get_entry_value(DDES_FI_TYPE_CUSTOM_FAULT)));
dss_config_t *inst_cfg = dss_get_inst_cfg();
uint32 mes_sess_cnt = inst_cfg->params.channel_num + inst_cfg->params.work_thread_cnt;
if (work_idx >= mes_sess_cnt) {
cm_panic(0);
}
if (msg->size < DSS_MES_MSG_HEAD_SIZE) {
LOG_DEBUG_ERR("invalid message req size.");
return;
}
dss_message_head_t *dss_head = (dss_message_head_t *)msg->buffer;
LOG_DEBUG_INF("[MES] Proc msg cmd:%u, src node:%u, dst node:%u begin.", (uint32)(dss_head->dss_cmd),
(uint32)(dss_head->src_inst), (uint32)(dss_head->dst_inst));
dss_session_ctrl_t *session_ctrl = dss_get_session_ctrl();
dss_session_t *session = session_ctrl->sessions[work_idx];
status_t ret;
if (dss_head->size < DSS_MES_MSG_HEAD_SIZE) {
LOG_DEBUG_ERR("Invalid message size");
return;
}
dss_set_cluster_proto_vers((uint8)dss_head->src_inst, dss_head->sw_proto_ver);
if (dss_head->msg_proto_ver > DSS_PROTO_VERSION) {
uint32 curr_proto_ver = MIN(dss_head->sw_proto_ver, DSS_PROTO_VERSION);
dss_ack_version_not_match(session, dss_head, curr_proto_ver);
return;
}
if (dss_head->dss_cmd >= DSS_CMD_CEIL) {
LOG_DEBUG_ERR("Invalid request received,cmd is %u.", (uint8)dss_head->dss_cmd);
return;
}
dss_init_packet(&session->recv_pack, CM_FALSE);
dss_init_packet(&session->send_pack, CM_FALSE);
dss_init_set(&session->send_pack, dss_head->msg_proto_ver);
session->proto_version = dss_head->msg_proto_ver;
session->is_remote_req = CM_TRUE;
LOG_DEBUG_INF(
"[MES] dss process message, cmd is %u, proto_version is %u.", dss_head->dss_cmd, dss_head->msg_proto_ver);
dss_processor_t *processor = &g_dss_processors[dss_head->dss_cmd];
const char *error_message = NULL;
int32 error_code;
while (CM_TRUE) {
cm_latch_s(&g_dss_instance.switch_latch, DSS_DEFAULT_SESSIONID, CM_FALSE, LATCH_STAT(LATCH_SWITCH));
if (processor->is_req) {
ret = dss_process_remote_req_prepare(session, msg, processor);
} else {
ret = dss_process_remote_ack_prepare(session, msg, processor);
}
if (ret != CM_SUCCESS) {
cm_unlatch(&g_dss_instance.switch_latch, LATCH_STAT(LATCH_SWITCH));
return;
}
processor->proc(session, msg);
cm_get_error(&error_code, &error_message);
if (error_code == ERR_DSS_SHM_LOCK_TIMEOUT) {
cm_unlatch(&g_dss_instance.switch_latch, LATCH_STAT(LATCH_SWITCH));
LOG_RUN_INF("Try again if error is shm lock timeout.");
cm_reset_error();
continue;
}
cm_unlatch(&g_dss_instance.switch_latch, LATCH_STAT(LATCH_SWITCH));
break;
}
LOG_DEBUG_INF("[MES] Proc msg cmd:%u, src node:%u, dst node:%u end.", (uint32)(dss_head->dss_cmd),
(uint32)(dss_head->src_inst), (uint32)(dss_head->dst_inst));
}
static status_t dss_register_proc(void)
{
mes_register_proc_func(dss_process_message);
return CM_SUCCESS;
}
static status_t dss_set_mes_message_pool(unsigned long long recv_msg_buf_size, mes_profile_t *profile)
{
LOG_DEBUG_INF("mes message pool size:%llu", recv_msg_buf_size);
int ret = CM_SUCCESS;
mes_msg_pool_attr_t *mpa = &profile->msg_pool_attr;
mpa->total_size = recv_msg_buf_size;
mpa->enable_inst_dimension = CM_FALSE;
mpa->buf_pool_count = DSS_MSG_BUFFER_NO_CEIL;
mpa->buf_pool_attr[DSS_MSG_BUFFER_NO_0].buf_size = DSS_FIRST_BUFFER_LENGTH;
mpa->buf_pool_attr[DSS_MSG_BUFFER_NO_1].buf_size = DSS_SECOND_BUFFER_LENGTH;
mpa->buf_pool_attr[DSS_MSG_BUFFER_NO_2].buf_size = DSS_THIRD_BUFFER_LENGTH;
mpa->buf_pool_attr[DSS_MSG_BUFFER_NO_3].buf_size = DSS_FOURTH_BUFFER_LENGTH;
mes_msg_buffer_pool_attr_t *buf_pool_attr;
buf_pool_attr = &mpa->buf_pool_attr[DSS_MSG_BUFFER_NO_3];
buf_pool_attr->shared_pool_attr.queue_num = DSS_MSG_FOURTH_BUFFER_QUEUE_NUM;
for (uint32 prio = 0; prio < profile->priority_cnt; prio++) {
buf_pool_attr->priority_pool_attr[prio].queue_num = DSS_MSG_FOURTH_BUFFER_QUEUE_NUM;
}
for (uint8 buf_pool_no = 0; buf_pool_no < mpa->buf_pool_count; buf_pool_no++) {
buf_pool_attr = &mpa->buf_pool_attr[buf_pool_no];
buf_pool_attr->shared_pool_attr.queue_num = DSS_MSG_BUFFER_QUEUE_NUM;
for (uint32 prio = 0; prio < profile->priority_cnt; prio++) {
buf_pool_attr->priority_pool_attr[prio].queue_num = DSS_MSG_BUFFER_QUEUE_NUM;
}
}
for (uint32 prio = 0; prio < profile->priority_cnt; prio++) {
mpa->max_buf_size[prio] = mpa->buf_pool_attr[DSS_MSG_BUFFER_NO_3].buf_size;
}
mes_msg_pool_minimum_info_t minimum_info = {0};
ret = mes_get_message_pool_minimum_info(profile, CM_FALSE, &minimum_info);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("[DSS] set mes message pool, get message pool minimum info failed");
return ret;
}
double fourth_ratio = ((double)(minimum_info.buf_pool_minimum_size[DSS_MSG_BUFFER_NO_3]) /
(mpa->total_size - minimum_info.metadata_size)) +
DBL_EPSILON;
mpa->buf_pool_attr[DSS_MSG_BUFFER_NO_3].proportion = fourth_ratio;
double left_ratio = 1 - fourth_ratio;
mpa->buf_pool_attr[DSS_MSG_BUFFER_NO_0].proportion = DSS_FIRST_BUFFER_RATIO * left_ratio;
mpa->buf_pool_attr[DSS_MSG_BUFFER_NO_1].proportion = DSS_SECOND_BUFFER_RATIO * left_ratio;
mpa->buf_pool_attr[DSS_MSG_BUFFER_NO_2].proportion =
1 - (mpa->buf_pool_attr[DSS_MSG_BUFFER_NO_0].proportion + mpa->buf_pool_attr[DSS_MSG_BUFFER_NO_1].proportion +
mpa->buf_pool_attr[DSS_MSG_BUFFER_NO_3].proportion);
return CM_SUCCESS;
}
static void dss_set_group_task_num(dss_config_t *dss_profile, mes_profile_t *mes_profile)
{
uint32 work_thread_cnt_load_meta =
(uint32)(dss_profile->params.work_thread_cnt * DSS_WORK_THREAD_LOAD_DATA_PERCENT);
if (work_thread_cnt_load_meta == 0) {
work_thread_cnt_load_meta = 1;
}
uint32 work_thread_cnt_comm = (dss_profile->params.work_thread_cnt - work_thread_cnt_load_meta);
mes_profile->send_directly = CM_TRUE;
mes_profile->send_task_count[MES_PRIORITY_ZERO] = 0;
mes_profile->work_task_count[MES_PRIORITY_ZERO] = work_thread_cnt_load_meta;
mes_profile->recv_task_count[MES_PRIORITY_ZERO] =
MAX(1, (uint32)(work_thread_cnt_load_meta * DSS_RECV_WORK_THREAD_RATIO));
mes_profile->send_task_count[MES_PRIORITY_ONE] = 0;
mes_profile->work_task_count[MES_PRIORITY_ONE] = work_thread_cnt_comm;
mes_profile->recv_task_count[MES_PRIORITY_ONE] =
MAX(1, (uint32)(work_thread_cnt_comm * DSS_RECV_WORK_THREAD_RATIO));
}
static status_t dss_set_mes_profile(mes_profile_t *profile)
{
errno_t errcode = memset_sp(profile, sizeof(mes_profile_t), 0, sizeof(mes_profile_t));
securec_check_ret(errcode);
dss_config_t *inst_cfg = dss_get_inst_cfg();
profile->inst_id = (uint32)inst_cfg->params.inst_id;
profile->pipe_type = (mes_pipe_type_t)inst_cfg->params.pipe_type;
profile->channel_cnt = inst_cfg->params.channel_num;
profile->conn_created_during_init = 0;
profile->mes_elapsed_switch = inst_cfg->params.elapsed_switch;
profile->mes_with_ip = inst_cfg->params.mes_with_ip;
profile->ip_white_list_on = inst_cfg->params.ip_white_list_on;
profile->inst_cnt = inst_cfg->params.nodes_list.inst_cnt;
uint32 inst_cnt = 0;
for (uint32 i = 0; i < DSS_MAX_INSTANCES; i++) {
uint64_t inst_mask = ((uint64)0x1 << i);
if ((inst_cfg->params.nodes_list.inst_map & inst_mask) == 0) {
continue;
}
errcode = strncpy_s(profile->inst_net_addr[inst_cnt].ip, CM_MAX_IP_LEN, inst_cfg->params.nodes_list.nodes[i],
strlen(inst_cfg->params.nodes_list.nodes[i]));
if (errcode != EOK) {
DSS_RETURN_IFERR2(CM_ERROR, DSS_THROW_ERROR(ERR_SYSTEM_CALL, (errcode)));
}
profile->inst_net_addr[inst_cnt].port = inst_cfg->params.nodes_list.ports[i];
profile->inst_net_addr[inst_cnt].need_connect = CM_TRUE;
profile->inst_net_addr[inst_cnt].inst_id = i;
inst_cnt++;
if (inst_cnt == inst_cfg->params.nodes_list.inst_cnt) {
break;
}
}
profile->priority_cnt = DSS_MES_PRIO_CNT;
profile->frag_size = DSS_FOURTH_BUFFER_LENGTH;
profile->max_wait_time = inst_cfg->params.mes_wait_timeout;
profile->connect_timeout = (int)CM_CONNECT_TIMEOUT;
profile->socket_timeout = (int)CM_NETWORK_IO_TIMEOUT;
dss_set_group_task_num(inst_cfg, profile);
status_t status = dss_set_mes_message_pool(inst_cfg->params.mes_pool_size, profile);
if (status != CM_SUCCESS) {
return status;
}
profile->tpool_attr.enable_threadpool = CM_FALSE;
return CM_SUCCESS;
}
static status_t dss_create_mes_session(void)
{
dss_config_t *inst_cfg = dss_get_inst_cfg();
uint32 mes_sess_cnt = inst_cfg->params.channel_num + inst_cfg->params.work_thread_cnt;
dss_session_ctrl_t *session_ctrl = dss_get_session_ctrl();
cm_spin_lock(&session_ctrl->lock, NULL);
if (session_ctrl->used_count > 0) {
DSS_RETURN_IFERR3(CM_ERROR,
LOG_RUN_ERR("dss_create_mes_session failed, mes must occupy first %u sessions.", mes_sess_cnt),
cm_spin_unlock(&session_ctrl->lock));
}
for (uint32 i = 0; i < mes_sess_cnt; i++) {
dss_session_t *session = session_ctrl->sessions[i];
session->is_direct = CM_TRUE;
session->is_closed = CM_FALSE;
session->is_used = CM_FALSE;
}
session_ctrl->used_count = mes_sess_cnt;
cm_spin_unlock(&session_ctrl->lock);
return CM_SUCCESS;
}
void dss_mes_regist_other_proc()
{
dss_config_t *inst_cfg = dss_get_inst_cfg();
if (!g_dss_instance.is_maintain && inst_cfg->params.nodes_list.inst_cnt > 1) {
regist_remote_read_proc(dss_read_volume_remote);
regist_invalidate_other_nodes_proc(dss_invalidate_other_nodes);
regist_broadcast_check_file_open_proc(dss_broadcast_check_file_open);
regist_refresh_ft_by_primary_proc(dss_refresh_ft_by_primary);
regist_get_node_by_path_remote_proc(dss_get_node_by_path_remote);
regist_meta_syn2other_nodes_proc(dss_syn_data2other_nodes);
}
}
status_t dss_startup_mes(void)
{
status_t status = dss_register_proc();
DSS_RETURN_IFERR2(status, LOG_RUN_ERR("dss_register_proc failed."));
status = dss_create_mes_session();
DSS_RETURN_IFERR2(status, LOG_RUN_ERR("dss_set_mes_profile failed."));
mes_profile_t profile;
status = dss_set_mes_profile(&profile);
DSS_RETURN_IFERR2(status, LOG_RUN_ERR("dss_set_mes_profile failed."));
dss_notify_regist_mes_func((dss_regist_mes_func_t)dss_mes_regist_other_proc);
dss_mes_regist_other_proc();
return mes_init(&profile);
}
void dss_stop_mes(void)
{
mes_uninit();
}
status_t dss_sync_bcast(dss_bcast_req_head_t *req, uint32 req_size, void *ack_msg_output)
{
if (g_dss_instance.is_maintain) {
return CM_SUCCESS;
}
dss_config_t *inst_cfg = dss_get_inst_cfg();
dss_params_t *param = &inst_cfg->params;
dss_bcast_community_t community = {0};
community.broadcast_proto_ver = dss_get_broadcast_proto_ver(0);
community.timeout = param->mes_wait_timeout;
status_t ret;
do {
LOG_DEBUG_INF("[MES] notify other dss instance to do cmd %u.", req->type);
dss_init_mes_head(&req->dss_head, DSS_CMD_REQ_BROADCAST, 0, (uint16)param->inst_id, CM_INVALID_ID16, req_size,
community.broadcast_proto_ver, 0);
ret = dss_broadcast_msg(req, &community, ack_msg_output);
if (ret == ERR_DSS_VERSION_NOT_MATCH) {
uint32 new_proto_ver = dss_get_broadcast_proto_ver(community.succ_inst);
LOG_RUN_INF("[CHECK_PROTO]broadcast msg proto version has changed, old is %hhu, new is %hhu",
community.broadcast_proto_ver, new_proto_ver);
community.broadcast_proto_ver = new_proto_ver;
community.version_not_match_inst = 0;
continue;
} else {
break;
}
} while (CM_TRUE);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("[DSS]: Failed to notify other dss instance, cmd: %u, errcode:%d, "
"OS errno:%d, OS errmsg:%s.",
req->type, cm_get_error_code(), errno, strerror(errno));
}
return ret;
}
status_t dss_bcast_ask_file_open(dss_vg_info_item_t *vg_item, uint64 ftid, bool32 *cmd_ack)
{
dss_req_check_open_file_t req;
req.ftid = ftid;
req.bcast_head.type = BCAST_REQ_DEL_DIR_FILE;
errno_t err = strncpy_s(req.vg_name, DSS_MAX_NAME_LEN, vg_item->vg_name, strlen(vg_item->vg_name));
if (err != EOK) {
DSS_THROW_ERROR(ERR_SYSTEM_CALL, err);
return CM_ERROR;
}
LOG_DEBUG_INF("[MES] notify other dss instance to do cmd %u, ftid:%llu in vg:%s.", req.bcast_head.type, ftid,
vg_item->vg_name);
dss_bcast_ack_bool_t recv_msg = {.default_ack = DSS_FALSE, .cmd_ack = DSS_FALSE};
DSS_RETURN_IF_ERROR(dss_sync_bcast((dss_bcast_req_head_t *)&req, sizeof(dss_req_check_open_file_t), &recv_msg));
if (cmd_ack != NULL) {
*cmd_ack = recv_msg.cmd_ack;
}
return CM_SUCCESS;
}
status_t dss_bcast_meta_data(dss_bcast_req_cmd_t cmd, char *data, uint32 size, bool32 *cmd_ack)
{
dss_req_meta_data_t req;
req.bcast_head.type = cmd;
req.data_size = size;
errno_t err = memcpy_s(req.data, sizeof(req.data), data, size);
if (err != EOK) {
DSS_THROW_ERROR(ERR_SYSTEM_CALL, err);
return CM_ERROR;
}
dss_bcast_ack_bool_t recv_msg = {.default_ack = DSS_TRUE, .cmd_ack = DSS_TRUE};
DSS_RETURN_IF_ERROR(
dss_sync_bcast((dss_bcast_req_head_t *)&req, OFFSET_OF(dss_req_meta_data_t, data) + size, &recv_msg));
if (cmd_ack != NULL) {
*cmd_ack = recv_msg.cmd_ack;
}
return CM_SUCCESS;
}
status_t dss_bcast_get_protocol_version(dss_get_version_output_t *get_version_output)
{
dss_req_common_t req;
req.bcast_head.type = BCAST_REQ_GET_VERSION;
return dss_sync_bcast((dss_bcast_req_head_t *)&req, sizeof(dss_req_common_t), get_version_output);
}
status_t dss_invalidate_other_nodes(
dss_vg_info_item_t *vg_item, char *meta_info, uint32 meta_info_size, bool32 *cmd_ack)
{
return dss_bcast_meta_data(BCAST_REQ_INVALIDATE_META, meta_info, meta_info_size, cmd_ack);
}
status_t dss_broadcast_check_file_open(dss_vg_info_item_t *vg_item, uint64 ftid, bool32 *cmd_ack)
{
return dss_bcast_ask_file_open(vg_item, ftid, cmd_ack);
}
status_t dss_syn_data2other_nodes(dss_vg_info_item_t *vg_item, char *meta_syn, uint32 meta_syn_size, bool32 *cmd_ack)
{
return dss_bcast_meta_data(BCAST_REQ_META_SYN, meta_syn, meta_syn_size, cmd_ack);
}
static void dss_check_inst_conn(uint32_t id, uint64 old_inst_stat, uint64 cur_inst_stat)
{
if (old_inst_stat == cur_inst_stat) {
return;
}
if (old_inst_stat == 0) {
(void)mes_connect_instance(id);
} else {
(void)mes_disconnect_instance(id);
}
}
void dss_check_mes_conn(uint64 cur_inst_map)
{
dss_config_t *inst_cfg = dss_get_inst_cfg();
uint64 old_inst_map = dss_get_inst_work_status();
if (old_inst_map == cur_inst_map) {
return;
}
dss_set_inst_work_status(cur_inst_map);
uint32 inst_cnt = 0;
for (uint32_t id = 0; id < DSS_MAX_INSTANCES; id++) {
if (id == inst_cfg->params.inst_id) {
continue;
}
uint64_t inst_mask = ((uint64)0x1 << id);
if ((inst_cfg->params.nodes_list.inst_map & inst_mask) == 0) {
continue;
}
dss_check_inst_conn(id, (old_inst_map & inst_mask), (cur_inst_map & inst_mask));
inst_cnt++;
if (inst_cnt == inst_cfg->params.nodes_list.inst_cnt) {
break;
}
}
}
static uint32 dss_get_remote_proto_ver(uint32 remoteid)
{
if (remoteid >= DSS_MAX_INSTANCES) {
LOG_DEBUG_ERR("Invalid remote id:%u.", remoteid);
return DSS_PROTO_VERSION;
}
uint32 remote_proto_ver = (uint32)cm_atomic32_get((atomic32_t *)&g_dss_instance.cluster_proto_vers[remoteid]);
if (remote_proto_ver == DSS_INVALID_VERSION) {
return DSS_PROTO_VERSION;
}
remote_proto_ver = MIN(remote_proto_ver, DSS_PROTO_VERSION);
return remote_proto_ver;
}
static int dss_get_mes_response(ruid_type ruid, mes_msg_t *response, int timeout_ms)
{
int ret = mes_get_response(ruid, response, timeout_ms);
if (ret == CM_SUCCESS) {
dss_message_head_t *ack_head = (dss_message_head_t *)response->buffer;
if (ack_head->size < DSS_MES_MSG_HEAD_SIZE) {
LOG_RUN_ERR("Invalid message size");
DSS_THROW_ERROR(ERR_DSS_MES_ILL, "msg len is invalid");
mes_release_msg(response);
return ERR_DSS_MES_ILL;
}
dss_set_cluster_proto_vers((uint8)ack_head->src_inst, ack_head->sw_proto_ver);
}
return ret;
}
status_t dss_exec_sync(dss_session_t *session, uint32 remoteid, uint32 currtid, status_t *remote_result)
{
status_t ret = CM_ERROR;
dss_message_head_t dss_head;
mes_msg_t msg;
dss_message_head_t *ack_head = NULL;
dss_config_t *inst_cfg = dss_get_inst_cfg();
uint32 timeout = inst_cfg->params.mes_wait_timeout;
uint32 new_proto_ver = dss_get_version(&session->recv_pack);
do {
uint32 buf_size = DSS_MES_MSG_HEAD_SIZE + session->recv_pack.head->size;
dss_init_mes_head(
&dss_head, DSS_CMD_REQ_SYB2ACTIVE, 0, (uint16)currtid, (uint16)remoteid, buf_size, new_proto_ver, 0);
ret = mes_send_request_x(dss_head.dst_inst, dss_head.flags, &dss_head.ruid, 2, &dss_head, DSS_MES_MSG_HEAD_SIZE,
session->recv_pack.buf, session->recv_pack.head->size);
char *err_msg = "The dss server fails to send messages to the remote node";
DSS_RETURN_IFERR2(ret, LOG_RUN_ERR("%s, src node(%u), dst node(%u).", err_msg, currtid, remoteid));
ret = dss_get_mes_response(dss_head.ruid, &msg, timeout);
DSS_RETURN_IFERR2(
ret, LOG_RUN_ERR("dss server receive msg from remote failed, src node:%u, dst node:%u, cmd:%u.", currtid,
remoteid, session->recv_pack.head->cmd));
ack_head = (dss_message_head_t *)msg.buffer;
if (ack_head->result == ERR_DSS_VERSION_NOT_MATCH) {
session->client_version = dss_get_client_version(&session->recv_pack);
new_proto_ver = MIN(ack_head->sw_proto_ver, DSS_PROTO_VERSION);
new_proto_ver = MIN(new_proto_ver, session->client_version);
session->proto_version = new_proto_ver;
if (session->proto_version != dss_get_version(&session->recv_pack)) {
LOG_RUN_INF("[CHECK_PROTO]The client protocol version need be changed, old protocol version is %u, new "
"protocol version is %u",
dss_get_version(&session->recv_pack), session->proto_version);
DSS_THROW_ERROR(
ERR_DSS_VERSION_NOT_MATCH, dss_get_version(&session->recv_pack), session->proto_version);
*remote_result = ERR_DSS_VERSION_NOT_MATCH;
mes_release_msg(&msg);
return ret;
} else {
dss_head.msg_proto_ver = new_proto_ver;
mes_release_msg(&msg);
continue;
}
} else {
break;
}
} while (CM_TRUE);
*remote_result = ack_head->result;
uint32 body_size = ack_head->size - DSS_MES_MSG_HEAD_SIZE;
if (*remote_result != CM_SUCCESS) {
if (ack_head->size < sizeof(dss_remote_exec_fail_ack_t)) {
DSS_RETURN_IFERR3(CM_ERROR, DSS_THROW_ERROR(ERR_DSS_MES_ILL, "msg len is invalid"), mes_release_msg(&msg));
}
dss_remote_exec_fail_ack_t *fail_ack = (dss_remote_exec_fail_ack_t *)msg.buffer;
DSS_THROW_ERROR(ERR_DSS_PROCESS_REMOTE, fail_ack->err_code, fail_ack->err_msg);
} else if (body_size > 0) {
dss_remote_exec_succ_ack_t *succ_ack = (dss_remote_exec_succ_ack_t *)msg.buffer;
LOG_DEBUG_INF("[MES] dss server receive msg from remote node, cmd:%u, ack to cli data size:%u.",
session->recv_pack.head->cmd, body_size);
dss_remote_ack_hdl_t *handle = dss_get_remote_ack_handle(session->recv_pack.head->cmd);
if (handle != NULL) {
handle->proc(session, succ_ack);
}
ret = dss_put_data(&session->send_pack, succ_ack->body_buf, body_size);
}
mes_release_msg(&msg);
return ret;
}
status_t dss_exec_on_remote(uint8 cmd, char *req, int32 req_size, char *ack, int ack_size, status_t *remote_result)
{
status_t ret = CM_ERROR;
dss_message_head_t *dss_head = (dss_message_head_t *)req;
dss_message_head_t *ack_head = NULL;
dss_session_t *session = NULL;
uint32 remoteid = DSS_INVALID_ID32;
uint32 currid = DSS_INVALID_ID32;
dss_config_t *inst_cfg = dss_get_inst_cfg();
uint32 timeout = inst_cfg->params.mes_wait_timeout;
mes_msg_t msg;
if (dss_create_session(NULL, &session) != CM_SUCCESS) {
LOG_RUN_ERR("Exec cmd:%u on remote node create session fail.", (uint32)cmd);
return CM_ERROR;
}
DSS_RETURN_IF_ERROR(dss_get_exec_nodeid(session, &currid, &remoteid));
LOG_DEBUG_INF("[MES] Exec cmd:%u on remote node:%u begin.", (uint32)cmd, remoteid);
do {
uint32 proto_ver = dss_get_remote_proto_ver(remoteid);
dss_init_mes_head(dss_head, cmd, 0, (uint16)currid, (uint16)remoteid, req_size, proto_ver, 0);
ret = mes_send_request(remoteid, dss_head->flags, &dss_head->ruid, req, dss_head->size);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("Exec cmd:%u on remote node:%u send msg fail.", (uint32)cmd, remoteid);
dss_destroy_session(session);
return ERR_DSS_MES_ILL;
}
ret = dss_get_mes_response(dss_head->ruid, &msg, timeout);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("Exec cmd:%u on remote node:%u recv msg fail.", (uint32)cmd, remoteid);
dss_destroy_session(session);
return ERR_DSS_MES_ILL;
}
ack_head = (dss_message_head_t *)msg.buffer;
if (ack_head->result == ERR_DSS_VERSION_NOT_MATCH) {
mes_release_msg(&msg);
continue;
}
break;
} while (CM_TRUE);
*remote_result = ack_head->result;
LOG_DEBUG_INF("[MES] dss server receive msg from remote node, cmd:%u, ack to cli data size:%hu, remote_result:%u.",
ack_head->dss_cmd, ack_head->size, (uint32)*remote_result);
if (*remote_result != CM_SUCCESS) {
if (ack_head->size < sizeof(dss_remote_exec_fail_ack_t)) {
DSS_THROW_ERROR(ERR_DSS_MES_ILL, "msg len is invalid");
DSS_RETURN_IFERR3(CM_ERROR, dss_destroy_session(session), mes_release_msg(&msg));
}
dss_remote_exec_fail_ack_t *fail_ack = (dss_remote_exec_fail_ack_t *)msg.buffer;
DSS_THROW_ERROR(ERR_DSS_PROCESS_REMOTE, fail_ack->err_code, fail_ack->err_msg);
} else {
if (ack_head->size != ack_size) {
DSS_THROW_ERROR(ERR_DSS_MES_ILL, "msg len is invalid");
DSS_RETURN_IFERR3(CM_ERROR, dss_destroy_session(session), mes_release_msg(&msg));
}
errno_t err = memcpy_s(ack, (size_t)ack_size, msg.buffer, (size_t)ack_head->size);
if (err != EOK) {
CM_THROW_ERROR(ERR_SYSTEM_CALL, err);
ret = CM_ERROR;
}
}
mes_release_msg(&msg);
dss_destroy_session(session);
LOG_DEBUG_INF("[MES] Exec cmd:%u on remote node:%u end.", (uint32)cmd, remoteid);
return ret;
}
void dss_proc_syb2active_req(dss_session_t *session, mes_msg_t *msg)
{
dss_message_head_t req_head = *(dss_message_head_t *)(msg->buffer);
uint32 size = req_head.size - DSS_MES_MSG_HEAD_SIZE;
uint16 srcid = req_head.src_inst;
uint16 dstid = req_head.dst_inst;
ruid_type ruid = req_head.ruid;
if (size > DSS_MAX_PACKET_SIZE) {
LOG_DEBUG_ERR(
"The dss server receive msg from remote failed, src node:%u, dst node:%u, size is %u.", srcid, dstid, size);
return;
}
LOG_DEBUG_INF("[MES] The dss server receive messages from remote node, src node:%u, dst node:%u.", srcid, dstid);
errno_t errcode = memcpy_s(session->recv_pack.buf, size, msg->buffer + DSS_MES_MSG_HEAD_SIZE, size);
if (errcode != EOK) {
LOG_DEBUG_ERR("The dss server memcpy msg failed, src node:%u, dst node:%u.", srcid, dstid);
return;
}
status_t ret = dss_proc_standby_req(session);
char *body_buf = NULL;
uint32 body_size = 0;
status_t status = dss_prepare_ack_msg(session, ret, &body_buf, &body_size, req_head.msg_proto_ver);
if (status != CM_SUCCESS) {
LOG_DEBUG_ERR("The dss server prepare ack msg failed, src node:%u, dst node:%u.", srcid, dstid);
return;
}
LOG_DEBUG_INF(
"[MES] The dss server send messages to the remote node, src node:%u, dst node:%u, cmd:%u,ack size:%u.", srcid,
dstid, session->send_pack.head->cmd, body_size);
dss_message_head_t ack;
dss_init_mes_head(
&ack, DSS_CMD_ACK_SYB2ACTIVE, 0, dstid, srcid, body_size + DSS_MES_MSG_HEAD_SIZE, req_head.msg_proto_ver, ruid);
ack.result = ret;
ret = mes_send_response_x(ack.dst_inst, ack.flags, ack.ruid, 2, &ack, DSS_MES_MSG_HEAD_SIZE, body_buf, body_size);
if (ret != CM_SUCCESS) {
LOG_DEBUG_ERR("The dss server fails to send messages to the remote node, src node:%u, dst node:%u.",
(uint32)(ack.src_inst), (uint32)(ack.dst_inst));
return;
}
LOG_DEBUG_INF("[MES] The dss server send messages to the remote node success, src node:%u, dst node:%u.",
(uint32)(ack.src_inst), (uint32)(ack.dst_inst));
}
status_t dss_send2standby(big_packets_ctrl_t *ack, const char *buf)
{
dss_message_head_t *dss_head = &ack->dss_head;
status_t ret = mes_send_response_x(dss_head->dst_inst, dss_head->flags, dss_head->ruid, 2, ack,
sizeof(big_packets_ctrl_t), buf, dss_head->size - sizeof(big_packets_ctrl_t));
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("The dssserver fails to send messages to the remote node, src node:%u, dst node:%u.",
(uint32)(dss_head->src_inst), (uint32)(dss_head->dst_inst));
return ret;
}
LOG_DEBUG_INF("[MES] The dss server send messages to the remote node success, src node:%u, dst node:%u.",
(uint32)(dss_head->src_inst), (uint32)(dss_head->dst_inst));
return ret;
}
static int32 dss_batch_load_core(dss_session_t *session, dss_loaddisk_req_t *req, char *read_buff, uint32 version)
{
uint32 remain = req->size;
uint32 read_total = 0;
big_packets_ctrl_t ctrl;
dss_message_head_t *req_dss_head = &req->dss_head;
(void)memset_s(&ctrl, sizeof(big_packets_ctrl_t), 0, sizeof(big_packets_ctrl_t));
dss_init_mes_head(&ctrl.dss_head, DSS_CMD_ACK_LOAD_DISK, 0, req_dss_head->dst_inst, req_dss_head->src_inst,
sizeof(big_packets_ctrl_t), version, req_dss_head->ruid);
ctrl.totalsize = req->size;
while (remain > 0) {
if (session && session->is_closed) {
LOG_RUN_ERR("session:%u is closed.", session->id);
return CM_ERROR;
}
uint64 roffset = req->offset + read_total;
uint32 each_size = (remain <= DSS_LOADDISK_BUFFER_SIZE) ? remain : DSS_LOADDISK_BUFFER_SIZE;
if (dss_read_volume_4standby(req->vg_name, req->volumeid, (int64)roffset, read_buff, each_size) != CM_SUCCESS) {
LOG_RUN_ERR("read volume for standby failed, vg name[%s], volume id[%u].", req->vg_name, req->volumeid);
return DSS_READ4STANDBY_ERR;
}
read_total += each_size;
remain -= each_size;
ctrl.cursize = each_size;
ctrl.endflag = (remain == 0) ? CM_TRUE : CM_FALSE;
ctrl.dss_head.size = each_size + sizeof(big_packets_ctrl_t);
if (dss_send2standby(&ctrl, read_buff) != CM_SUCCESS) {
LOG_RUN_ERR(
"read volume for standby send msg failed, vg name[%s], volume id[%u].", req->vg_name, req->volumeid);
return CM_ERROR;
}
LOG_DEBUG_INF("[MES] load disk from active info vg name(%s) volume id(%u) msg seq(%hu) msg len(%u).",
req->vg_name, req->volumeid, ctrl.seq, ctrl.cursize);
ctrl.offset += each_size;
ctrl.seq++;
}
return CM_SUCCESS;
}
int32 dss_batch_load(dss_session_t *session, dss_loaddisk_req_t *req, uint32 version)
{
if (req->size % DSS_DISK_UNIT_SIZE != 0) {
return DSS_READ4STANDBY_ERR;
}
if (session->thv_read_buf == NULL) {
session->thv_read_buf = (char *)cm_malloc_align(DSS_DISK_UNIT_SIZE, DSS_LOADDISK_BUFFER_SIZE);
if (session->thv_read_buf == NULL) {
DSS_RETURN_IFERR2(
DSS_READ4STANDBY_ERR, DSS_THROW_ERROR(ERR_ALLOC_MEMORY, DSS_LOADDISK_BUFFER_SIZE, "g_thv_read_buf"));
}
}
(void)memset_s(session->thv_read_buf, DSS_LOADDISK_BUFFER_SIZE, 0, DSS_LOADDISK_BUFFER_SIZE);
dss_lock_vg_mem_and_shm_ex_s(session, req->vg_name);
int32 ret = dss_batch_load_core(session, req, session->thv_read_buf, version);
dss_common_block_t *print = (dss_common_block_t*)session->thv_read_buf;
LOG_DEBUG_INF("[MES] dss_batch_load Exec load disk req, type: %u, id:%s, version:%llu.", print->type, dss_display_metaid(print->id), print->version);
dss_unlock_vg_mem_and_shm_ex(session, req->vg_name);
return ret;
}
void dss_proc_loaddisk_req(dss_session_t *session, mes_msg_t *msg)
{
int32 ret = CM_ERROR;
dss_loaddisk_req_t *req = (dss_loaddisk_req_t *)msg->buffer;
dss_message_head_t *req_dss_head = &req->dss_head;
if (req_dss_head->size != sizeof(dss_loaddisk_req_t)) {
LOG_RUN_ERR("Invalid reveive msg size from remote failed, src node(%hu), dst node(%hu).",
req_dss_head->src_inst, req_dss_head->dst_inst);
return;
}
LOG_DEBUG_INF("[MES] Exec load disk req, src node(%hu), volume id:%u, offset:%llu, size:%u.",
req_dss_head->src_inst, req->volumeid, req->offset, req->size);
ret = dss_batch_load(session, req, req_dss_head->msg_proto_ver);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("Exec load disk req failed, src node:%u, volume id:%u, offset:%llu, size:%u.",
(uint32)(req_dss_head->src_inst), req->volumeid, req->offset, req->size);
dss_proc_remote_req_err(session, &req->dss_head, DSS_CMD_ACK_LOAD_DISK, ret);
}
return;
}
static status_t dss_init_readvlm_remote_params(
dss_loaddisk_req_t *req, const char *entry, uint32 *currid, uint32 *remoteid, dss_session_t *session)
{
errno_t errcode = memset_s(req, sizeof(dss_loaddisk_req_t), 0, sizeof(dss_loaddisk_req_t));
securec_check_ret(errcode);
errcode = memcpy_s(req->vg_name, DSS_MAX_NAME_LEN, entry, DSS_MAX_NAME_LEN);
securec_check_ret(errcode);
DSS_RETURN_IF_ERROR(dss_get_exec_nodeid(session, currid, remoteid));
if (*currid == *remoteid) {
LOG_DEBUG_ERR("read from current node %u no need to send message.", *currid);
return CM_ERROR;
}
return CM_SUCCESS;
}
static bool32 dss_packets_verify(big_packets_ctrl_t *lastctrl, big_packets_ctrl_t *ctrl, uint32 size)
{
if (ctrl->endflag != CM_TRUE && size != ctrl->totalsize) {
LOG_RUN_ERR("[MES] end flag is not CM_TRUE.");
return CM_FALSE;
}
if (ctrl->endflag == CM_TRUE && ctrl->cursize + ctrl->offset != ctrl->totalsize) {
LOG_RUN_ERR("[MES]size is not true, cursize is %u, offset is %u, total size is %u.", ctrl->cursize,
ctrl->offset, ctrl->totalsize);
return CM_FALSE;
}
*lastctrl = *ctrl;
return CM_TRUE;
}
static status_t dss_rec_msgs(ruid_type ruid, void *buf, uint32 size)
{
mes_msg_t msg;
big_packets_ctrl_t lastctrl;
(void)memset_s(&lastctrl, sizeof(big_packets_ctrl_t), 0, sizeof(big_packets_ctrl_t));
big_packets_ctrl_t *ctrl;
dss_config_t *inst_cfg = dss_get_inst_cfg();
uint32 timeout = inst_cfg->params.mes_wait_timeout;
do {
status_t ret = dss_get_mes_response(ruid, &msg, timeout);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("dss server receive msg from remote node failed, result:%d.", ret);
return ret;
}
dss_message_head_t *ack_head = (dss_message_head_t *)msg.buffer;
if (ack_head->result == ERR_DSS_VERSION_NOT_MATCH) {
mes_release_msg(&msg);
return ERR_DSS_VERSION_NOT_MATCH;
}
if (ack_head->size < sizeof(big_packets_ctrl_t)) {
ret = CM_ERROR;
LOG_RUN_ERR("Dss load disk from remote node failed invalid size, msg len(%d) error.", ack_head->size);
if (ack_head->size == DSS_MES_MSG_HEAD_SIZE) {
ret = ack_head->result;
}
mes_release_msg(&msg);
return ret;
}
ctrl = (big_packets_ctrl_t *)msg.buffer;
if (dss_packets_verify(&lastctrl, ctrl, size) == CM_FALSE) {
mes_release_msg(&msg);
LOG_RUN_ERR("dss server receive msg verify failed.");
return CM_ERROR;
}
if (size < ctrl->offset + ctrl->cursize || ack_head->size != (sizeof(big_packets_ctrl_t) + ctrl->cursize)) {
mes_release_msg(&msg);
LOG_RUN_ERR("dss server receive msg size is invalid.");
return CM_ERROR;
}
errno_t errcode =
memcpy_s((char *)buf + ctrl->offset, ctrl->cursize, msg.buffer + sizeof(big_packets_ctrl_t), ctrl->cursize);
mes_release_msg(&msg);
securec_check_ret(errcode);
} while (ctrl->endflag != CM_TRUE);
return CM_SUCCESS;
}
static status_t dss_read_volume_remote_core(dss_session_t *session, dss_loaddisk_req_t *req, void *buf)
{
status_t ret = CM_ERROR;
do {
dss_message_head_t *dss_head = &req->dss_head;
LOG_DEBUG_INF("[MES] Ready msg cmd:%u, src node:%u, dst node:%u end", dss_head->dss_cmd,
(uint32)(dss_head->src_inst), (uint32)(dss_head->dst_inst));
ret = mes_send_request(dss_head->dst_inst, dss_head->flags, &dss_head->ruid, (char *)req, dss_head->size);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("The dssserver fails to send messages to the remote node, src node (%u), dst node(%u).",
dss_head->src_inst, dss_head->dst_inst);
return ret;
}
ret = dss_rec_msgs(dss_head->ruid, buf, req->size);
if (ret == ERR_DSS_VERSION_NOT_MATCH) {
req->dss_head.msg_proto_ver = dss_get_remote_proto_ver(req->dss_head.dst_inst);
continue;
}
break;
} while (CM_TRUE);
return ret;
}
status_t dss_read_volume_remote(const char *vg_name, dss_volume_t *volume, int64 offset, void *buf, int32 size)
{
status_t ret = CM_ERROR;
dss_loaddisk_req_t req;
dss_session_t *session = NULL;
uint32 remoteid = DSS_INVALID_ID32;
uint32 currid = DSS_INVALID_ID32;
uint32 volumeid = volume->id;
if (dss_create_session(NULL, &session) != CM_SUCCESS) {
LOG_RUN_ERR("read volume from active node create session failed.");
return CM_ERROR;
}
ret = dss_init_readvlm_remote_params(&req, vg_name, &currid, &remoteid, session);
if (ret != CM_SUCCESS || currid == remoteid) {
dss_destroy_session(session);
return CM_ERROR;
}
LOG_DEBUG_INF(
"instance %u start to load %d data of disk(%s) from the primary node:%u.", currid, size, vg_name, remoteid);
req.volumeid = volumeid;
req.offset = (uint64)offset;
req.size = (uint32)size;
uint32 remote_proto_ver = dss_get_remote_proto_ver(remoteid);
dss_init_mes_head(&req.dss_head, DSS_CMD_REQ_LOAD_DISK, 0, (uint16)currid, (uint16)remoteid,
sizeof(dss_loaddisk_req_t), remote_proto_ver, 0);
ret = dss_read_volume_remote_core(session, &req, buf);
dss_destroy_session(session);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR(
"The dssserver receive messages from remote node failed, src node:%u, dst node:%u.", currid, remoteid);
return ret;
}
LOG_DEBUG_INF("[MES] load disk(%s) data from the active node success.", vg_name);
return CM_SUCCESS;
}
status_t dss_join_cluster(bool32 *join_succ)
{
*join_succ = CM_FALSE;
LOG_DEBUG_INF("[MES] Try join cluster begin.");
dss_join_cluster_req_t req;
dss_config_t *cfg = dss_get_inst_cfg();
req.reg_id = (uint32)(cfg->params.inst_id);
status_t remote_result;
dss_join_cluster_ack_t ack;
status_t ret = dss_exec_on_remote(DSS_CMD_REQ_JOIN_CLUSTER, (char *)&req, sizeof(dss_join_cluster_req_t),
(char *)&ack, sizeof(dss_join_cluster_ack_t), &remote_result);
if (ret != CM_SUCCESS || remote_result != CM_SUCCESS) {
LOG_RUN_ERR("Try join cluster exec fail.");
return CM_ERROR;
}
if (ack.is_reg) {
*join_succ = CM_TRUE;
}
LOG_DEBUG_INF("[MES] Try join cluster exec result:%u.", (uint32)*join_succ);
return CM_SUCCESS;
}
void dss_proc_join_cluster_req(dss_session_t *session, mes_msg_t *msg)
{
dss_message_head_t *req_head = (dss_message_head_t *)msg->buffer;
if (req_head->size != sizeof(dss_join_cluster_req_t)) {
LOG_RUN_ERR("Proc join cluster from remote node:%u check req msg fail.", (uint32)(req_head->src_inst));
return;
}
dss_join_cluster_req_t *req = (dss_join_cluster_req_t *)msg->buffer;
uint16 dst_inst = req_head->src_inst;
uint16 src_inst = req_head->dst_inst;
uint32 version = req_head->msg_proto_ver;
ruid_type ruid = req_head->ruid;
LOG_DEBUG_INF(
"[MES] Proc join cluster from remote node:%u reg node:%u begin.", (uint32)(req_head->src_inst), req->reg_id);
dss_join_cluster_ack_t ack;
dss_init_mes_head(
&ack.ack_head, DSS_CMD_ACK_JOIN_CLUSTER, 0, src_inst, dst_inst, sizeof(dss_join_cluster_ack_t), version, ruid);
ack.is_reg = CM_FALSE;
ack.ack_head.result = CM_SUCCESS;
uint64 work_status = dss_get_inst_work_status();
uint64 inst_mask = ((uint64)0x1 << req->reg_id);
if (work_status & inst_mask) {
ack.is_reg = CM_TRUE;
}
LOG_DEBUG_INF("[MES] Proc join cluster from remote node:%u, reg node:%u, is_reg:%u.", (uint32)(req_head->src_inst),
req->reg_id, (uint32)ack.is_reg);
int send_ret = mes_send_response(dst_inst, ack.ack_head.flags, ruid, (char *)&ack, ack.ack_head.size);
if (send_ret != CM_SUCCESS) {
LOG_RUN_ERR("Proc join cluster from remote node:%u, reg node:%u send ack fail.", (uint32)dst_inst, req->reg_id);
return;
}
LOG_DEBUG_INF("[MES] Proc join cluster from remote node:%u, reg node:%u send ack size:%u end.", (uint32)dst_inst,
req->reg_id, ack.ack_head.size);
}
static status_t dss_get_node_by_path_inner(dss_session_t *session, dss_check_dir_output_t *output_info,
dss_get_ft_block_ack_t *ack, dss_vg_info_item_t *ack_vg_item, dss_ft_block_t **shm_block)
{
if (dss_cmp_blockid(ack->parent_node_id, DSS_INVALID_64)) {
return CM_SUCCESS;
}
if (!dss_read_remote_checksum(ack->parent_block, DSS_BLOCK_SIZE)) {
DSS_THROW_ERROR(ERR_DSS_MES_ILL, "Invalid get ft block ack msg block checksum error.");
return CM_ERROR;
}
if (is_ft_root_block(ack->parent_node_id)) {
dss_root_ft_block_t *ft_block = (dss_root_ft_block_t *)ack->parent_block;
if (ack->parent_node_id.item >= ft_block->ft_block.node_num) {
DSS_THROW_ERROR(ERR_DSS_MES_ILL, "Invalid get ft block ack msg parent_node_id item error.");
return CM_ERROR;
}
char *root = ack_vg_item->dss_ctrl->root;
errno_t errcode = memcpy_s(root, DSS_BLOCK_SIZE, ack->parent_block, DSS_BLOCK_SIZE);
if (errcode != EOK) {
CM_THROW_ERROR(ERR_SYSTEM_CALL, (errcode));
return CM_ERROR;
}
if (output_info->parent_node != NULL) {
*output_info->parent_node =
(gft_node_t *)((root + sizeof(dss_root_ft_block_t)) + ack->parent_node_id.item * sizeof(gft_node_t));
}
return CM_SUCCESS;
}
dss_block_id_t block_id = ack->parent_node_id;
block_id.item = 0;
*shm_block = NULL;
status_t ret = dss_refresh_block_in_shm(
session, *output_info->item, block_id, DSS_BLOCK_TYPE_FT, ack->parent_block, (char **)shm_block);
if (ret == CM_SUCCESS && output_info->parent_node != NULL) {
*output_info->parent_node = dss_get_node_by_ft(*shm_block, ack->parent_node_id.item);
}
return ret;
}
status_t dss_get_node_by_path_remote(dss_session_t *session, const char *dir_path, gft_item_type_t type,
dss_check_dir_output_t *output_info, bool32 is_throw_err)
{
dss_get_ft_block_req_t req;
req.type = type;
errno_t errcode = strncpy_s(req.path, sizeof(req.path), dir_path, strlen(dir_path));
DSS_SECUREC_SS_RETURN_IF_ERROR(errcode, CM_ERROR);
status_t remote_result;
dss_get_ft_block_ack_t ack;
status_t ret = dss_exec_on_remote(DSS_CMD_REQ_GET_FT_BLOCK, (char *)&req, sizeof(dss_get_ft_block_req_t),
(char *)&ack, sizeof(dss_get_ft_block_ack_t), &remote_result);
DSS_RETURN_IFERR2(ret, LOG_RUN_ERR("Try get node by path remote failed."));
DSS_RETURN_IF_ERROR(remote_result);
if (dss_cmp_blockid(ack.node_id, DSS_INVALID_64)) {
DSS_THROW_ERROR(ERR_DSS_MES_ILL, "Invalid get ft node id ack msg error.");
return CM_ERROR;
}
if (!dss_read_remote_checksum(ack.block, DSS_BLOCK_SIZE)) {
DSS_THROW_ERROR(ERR_DSS_MES_ILL, "Invalid get ft block ack msg block checksum error.");
return CM_ERROR;
}
dss_vg_info_item_t *ack_vg_item = dss_find_vg_item(ack.vg_name);
if (ack_vg_item == NULL) {
DSS_THROW_ERROR(ERR_DSS_MES_ILL, "Invalid get ft block ack msg vg_name is not exist.");
return CM_ERROR;
}
if (output_info == NULL) {
DSS_THROW_ERROR(ERR_DSS_MES_ILL, "Invalid param output_info, output_info is null.");
return CM_ERROR;
}
if (output_info->item != NULL) {
*output_info->item = ack_vg_item;
}
dss_ft_block_t *shm_block = NULL;
dss_block_id_t block_id = ack.node_id;
if (is_ft_root_block(ack.node_id)) {
dss_root_ft_block_t *ft_block = (dss_root_ft_block_t *)ack.block;
if (ack.node_id.item >= ft_block->ft_block.node_num) {
DSS_THROW_ERROR(ERR_DSS_MES_ILL, "Invalid get ft block ack msg node_id item error.");
return CM_ERROR;
}
char *root = ack_vg_item->dss_ctrl->root;
errcode = memcpy_s(root, DSS_BLOCK_SIZE, ack.block, DSS_BLOCK_SIZE);
securec_check_ret(errcode);
if (output_info->out_node != NULL) {
*output_info->out_node =
(gft_node_t *)((root + sizeof(dss_root_ft_block_t)) + ack.node_id.item * sizeof(gft_node_t));
}
} else {
block_id.item = 0;
if (output_info->item == NULL) {
DSS_THROW_ERROR(ERR_DSS_MES_ILL, "Invalid param output_info->item, output_info->item is null.");
return CM_ERROR;
}
ret = dss_refresh_block_in_shm(
session, *output_info->item, block_id, DSS_BLOCK_TYPE_FT, ack.block, (char **)&shm_block);
DSS_RETURN_IF_ERROR(ret);
if (output_info->out_node != NULL) {
*output_info->out_node = dss_get_node_by_ft(shm_block, ack.node_id.item);
}
}
return dss_get_node_by_path_inner(session, output_info, &ack, ack_vg_item, &shm_block);
}
status_t dss_refresh_ft_by_primary(dss_block_id_t blockid, uint32 vgid, char *vg_name)
{
LOG_DEBUG_INF("[MES] Try refresh ft by primary begin.");
dss_refresh_ft_req_t req;
req.blockid = blockid;
req.vgid = vgid;
if (strncpy_s(req.vg_name, sizeof(req.vg_name), vg_name, strlen(vg_name)) != EOK) {
LOG_DEBUG_ERR("Try refresh ft by primary req vg_name fail.");
return CM_ERROR;
}
status_t remote_result;
dss_refresh_ft_ack_t ack;
status_t ret = dss_exec_on_remote(DSS_CMD_REQ_REFRESH_FT, (char *)&req, sizeof(dss_refresh_ft_req_t), (char *)&ack,
sizeof(dss_refresh_ft_ack_t), &remote_result);
if (ret != CM_SUCCESS || remote_result != CM_SUCCESS) {
LOG_DEBUG_ERR("Try refresh ft by primary exec on remote fail.");
return CM_ERROR;
}
LOG_DEBUG_INF("[MES] Try refresh ft by primary result:%u.", ack.is_ok);
if (!ack.is_ok) {
LOG_DEBUG_ERR("Try refresh ft by primary ack is not ok.");
return CM_ERROR;
}
return CM_SUCCESS;
}
static status_t dss_proc_get_ft_block_req_core(
dss_session_t *session, dss_get_ft_block_req_t *req, dss_get_ft_block_ack_t *ack, dss_vg_info_item_t **vg_item)
{
gft_node_t *out_node = NULL;
gft_node_t *parent_node = NULL;
dss_vg_info_item_t *file_vg_item = *vg_item;
dss_check_dir_output_t output_info = {&out_node, &file_vg_item, &parent_node, CM_FALSE};
DSS_RETURN_IF_ERROR(dss_check_dir(session, req->path, req->type, &output_info, CM_TRUE));
if (file_vg_item->id != (*vg_item)->id) {
LOG_DEBUG_INF("Change shm lock when get link path :%s, src vg id:%u, dst vg id:%u.", req->path, (*vg_item)->id,
file_vg_item->id);
dss_unlock_vg_mem_and_shm(session, *vg_item);
*vg_item = file_vg_item;
dss_lock_vg_mem_and_shm_s_force(session, *vg_item);
}
if (out_node == NULL) {
LOG_RUN_ERR("Invalid param out_node, out_node is null.");
return CM_ERROR;
}
ack->node_id = out_node->id;
DSS_LOG_DEBUG_OP("[MES] Req out node, v:%u,au:%llu,block:%u,item:%u,type:%d,path:%s.", out_node->id.volume,
(uint64)out_node->id.au, out_node->id.block, out_node->id.item, req->type, req->path);
dss_ft_block_t *block = dss_get_ft_by_node(out_node);
if (out_node->type != GFT_PATH) {
dss_latch_s_node(session, out_node, NULL);
}
errno_t errcode = memcpy_s(ack->block, DSS_BLOCK_SIZE, block, DSS_BLOCK_SIZE);
if (out_node->type != GFT_PATH) {
dss_unlatch_node(out_node);
}
if (errcode != EOK) {
CM_THROW_ERROR(ERR_SYSTEM_CALL, (errcode));
return CM_ERROR;
}
errcode = strncpy_sp(ack->vg_name, DSS_MAX_NAME_LEN, file_vg_item->vg_name, strlen(file_vg_item->vg_name));
if (errcode != EOK) {
CM_THROW_ERROR(ERR_SYSTEM_CALL, (errcode));
return CM_ERROR;
}
if (parent_node != NULL) {
ack->parent_node_id = parent_node->id;
DSS_LOG_DEBUG_OP(
"[MES] Req parent node: %s,type:%d,path:%s.", dss_display_metaid(parent_node->id), req->type, req->path);
block = dss_get_ft_by_node(parent_node);
errcode = memcpy_s(ack->parent_block, DSS_BLOCK_SIZE, block, DSS_BLOCK_SIZE);
if (errcode != EOK) {
CM_THROW_ERROR(ERR_SYSTEM_CALL, (errcode));
return CM_ERROR;
}
} else {
dss_set_blockid(&ack->parent_node_id, DSS_INVALID_64);
}
return CM_SUCCESS;
}
void dss_proc_get_ft_block_req(dss_session_t *session, mes_msg_t *msg)
{
if (msg->size != sizeof(dss_get_ft_block_req_t)) {
LOG_RUN_ERR("Get ft block from remote node check req msg size fail.");
return;
}
dss_get_ft_block_req_t *req = (dss_get_ft_block_req_t *)msg->buffer;
uint16 src_inst = req->dss_head.dst_inst;
uint16 dst_inst = req->dss_head.src_inst;
ruid_type ruid = req->dss_head.ruid;
uint32 proto_ver = req->dss_head.msg_proto_ver;
if (req->type > GFT_LINK) {
LOG_RUN_ERR("Get ft block from remote node:%u check req msg type:%d fail.", (uint32)dst_inst, req->type);
dss_proc_remote_req_err(session, &req->dss_head, DSS_CMD_ACK_GET_FT_BLOCK, CM_ERROR);
return;
}
status_t status = dss_check_device_path(req->path);
if (status != CM_SUCCESS) {
dss_proc_remote_req_err(session, &req->dss_head, DSS_CMD_ACK_GET_FT_BLOCK, status);
return;
}
LOG_DEBUG_INF("[MES] Get ft block from remote node:%u, path:%s begin.", (uint32)dst_inst, req->path);
uint32 beg_pos = 0;
char vg_name[DSS_MAX_NAME_LEN];
status = dss_get_name_from_path(req->path, &beg_pos, vg_name);
if (status != CM_SUCCESS) {
dss_proc_remote_req_err(session, &req->dss_head, DSS_CMD_ACK_GET_FT_BLOCK, status);
}
dss_get_ft_block_ack_t ack;
dss_init_mes_head(&ack.ack_head, DSS_CMD_ACK_GET_FT_BLOCK, 0, src_inst, dst_inst, sizeof(dss_get_ft_block_ack_t),
proto_ver, ruid);
dss_vg_info_item_t *vg_item = dss_find_vg_item(vg_name);
if (vg_item == NULL) {
LOG_RUN_ERR("invalid vg name: %s ,Get vg item fail.", vg_name);
DSS_THROW_ERROR(ERR_DSS_VG_NOT_EXIST, vg_name);
return;
}
dss_lock_vg_mem_and_shm_s_force(session, vg_item);
status = dss_proc_get_ft_block_req_core(session, req, &ack, &vg_item);
dss_unlock_vg_mem_and_shm(session, vg_item);
if (status != CM_SUCCESS) {
dss_proc_remote_req_err(session, &req->dss_head, DSS_CMD_ACK_GET_FT_BLOCK, status);
return;
}
ack.ack_head.result = CM_SUCCESS;
int send_ret = mes_send_response(dst_inst, ack.ack_head.flags, ruid, (char *)&ack, ack.ack_head.size);
if (send_ret != CM_SUCCESS) {
LOG_RUN_ERR("Get ft block from remote node:%u, path:%s send ack fail.", (uint32)(dst_inst), req->path);
} else {
LOG_DEBUG_INF("[MES] Get ft block from remote node:%u, path:%s end.", (uint32)(dst_inst), req->path);
}
}
void dss_proc_refresh_ft_by_primary_req(dss_session_t *session, mes_msg_t *msg)
{
dss_message_head_t *req_head = (dss_message_head_t *)msg->buffer;
if (req_head->size != sizeof(dss_refresh_ft_req_t)) {
LOG_RUN_ERR("Refresh ft by primary from remote node:%u check req msg fail.", (uint32)(req_head->src_inst));
return;
}
dss_refresh_ft_req_t *refresh_ft_req = (dss_refresh_ft_req_t *)msg->buffer;
LOG_DEBUG_INF("[MES] Refresh ft by primary from remote node:%u, blockid:%s, vgid:%u, vg_name:%s begin.",
(uint32)(req_head->src_inst), dss_display_metaid(refresh_ft_req->blockid), refresh_ft_req->vgid,
refresh_ft_req->vg_name);
if (dss_refresh_ft_block(session, refresh_ft_req->vg_name, refresh_ft_req->vgid, refresh_ft_req->blockid) !=
CM_SUCCESS) {
LOG_RUN_ERR("Refresh ft by primary from remote node:%u, blockid:%s, vgid:%u, vg_name:%s refresh fail.",
(uint32)(req_head->src_inst), dss_display_metaid(refresh_ft_req->blockid), refresh_ft_req->vgid,
refresh_ft_req->vg_name);
dss_proc_remote_req_err(session, &refresh_ft_req->dss_head, DSS_CMD_ACK_REFRESH_FT, CM_ERROR);
return;
}
uint16 dst_inst = req_head->src_inst;
uint16 src_inst = req_head->dst_inst;
uint32 version = req_head->msg_proto_ver;
ruid_type ruid = req_head->ruid;
dss_refresh_ft_ack_t ack;
dss_init_mes_head(
&ack.ack_head, DSS_CMD_ACK_REFRESH_FT, 0, src_inst, dst_inst, sizeof(dss_refresh_ft_ack_t), version, ruid);
ack.is_ok = CM_TRUE;
ack.ack_head.result = CM_SUCCESS;
int send_ret = mes_send_response(dst_inst, ack.ack_head.flags, ruid, (char *)&ack, ack.ack_head.size);
if (send_ret != CM_SUCCESS) {
LOG_RUN_ERR("Refresh ft by primary from remote node:%u, blockid:%s, vgid:%u, vg_name:%s send ack fail.",
(uint32)dst_inst, dss_display_metaid(refresh_ft_req->blockid), refresh_ft_req->vgid,
refresh_ft_req->vg_name);
return;
}
LOG_DEBUG_INF("[MES] Refresh ft by primary from remote node:%u, blockid:%s, vgid:%u, vg_name:%s refresh end.",
(uint32)dst_inst, dss_display_metaid(refresh_ft_req->blockid), refresh_ft_req->vgid, refresh_ft_req->vg_name);
}