* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* mes_uc.c
*
*
* IDENTIFICATION
* src/mec/mes_uc.c
*
* -------------------------------------------------------------------------
*/
#include <dlfcn.h>
#include "mes_log_module.h"
#include "cm_ip.h"
#include "cm_memory.h"
#include "cm_timer.h"
#include "cm_spinlock.h"
#include "cm_sync.h"
#include "cs_tcp.h"
#include "rc_reform.h"
#include "mes_uc_interface.h"
#include "mes_msg_pool.h"
#include "mes_queue.h"
#include "mes_uc.h"
static uint32_t MY_PID = 400;
#define MES_UC_BYTE_PER_PAGE_PI 8320
#define MES_UC_MAX_USER_DATA_LEN 512
#define DPLOG_MAX_NUM 20
#define DPUC_DATA_MSG_RESERVE 1024
#define DPUC_DATA_MSG_MAX 2048
#define NS_PER_MS 1000000
#define MES_UC_XNET_TIMEOUT_TIMES 11
#ifndef RETURN_OK
#define RETURN_OK 0
#endif
#ifndef RETURN_ERROR
#define RETURN_ERROR (-1)
#endif
#define MES_UC_ALLOC_PAGES_SYNC(page_num, sgl_ptr) \
mes_global_handle()->allocate_multi_pages_sync((page_num), (sgl_ptr), (MY_PID), __FUNCTION__, __LINE__)
#define MES_UC_FREE_PAGES(sgl_ptr) mes_global_handle()->free_multi_pages((sgl_ptr), (MY_PID), __FUNCTION__, __LINE__)
#define MES_HOST_NAME(id) ((char *)g_mes.profile.inst_arr[id].ip)
#define MES_SHOULD_RECONN(bits, id) (((bits) >> (id)) & 0x1)
#define MES_UC_RETURN_IFERR(ret) \
do { \
int32_t _status_ = (ret); \
if (SECUREC_UNLIKELY(_status_ != DP_OK)) { \
return _status_; \
} \
} while (0)
typedef struct st_mes_uc_config {
uint32 lsid;
dpuc_comm_mgr *com_mgr;
dpuc_eid_obj *eid_obj;
dpuc_eid_t eid;
dpuc_eid_t dst_eid[OG_MAX_INSTANCES];
} mes_uc_config_t;
typedef struct {
uint64 start_time;
uint32 cmd;
} mes_uc_send_context;
typedef struct st_mes_uc_recv_thread {
bool8 thread_ready;
dtc_msgqueue_t msg_queue;
thread_lock_t lock;
} mes_uc_recv_thread_t;
mes_uc_config_t g_mes_uc_config;
mes_uc_recv_thread_t g_mes_uc_recv_thead[OG_MES_MAX_REACTOR_THREAD_NUM];
mes_uc_conn_t g_mes_uc_channel_status[OG_MAX_INSTANCES];
spinlock_t g_thread_queue_id_lock;
uint8 g_thread_id = 0;
__thread uint8 g_thread_queue_id = 0xFF;
static mes_interface_t g_mes_interface = { .uc_handle = NULL };
thread_t g_mes_channel_check_thread;
uint64 g_channel_reconn_bits;
cm_thread_cond_t g_reconn_thread_cond;
cm_thread_cond_t g_uc_io_finish_cond;
mes_interface_t *mes_global_handle(void)
{
return &g_mes_interface;
}
static status_t mes_load_symbol(void *lib_handle, char *symbol, void **sym_lib_handle)
{
const char *dlsym_err = NULL;
*sym_lib_handle = dlsym(lib_handle, symbol);
dlsym_err = dlerror();
if (dlsym_err != NULL) {
OG_THROW_ERROR(ERR_LOAD_SYMBOL, symbol, dlsym_err);
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t uc_init_lib(void)
{
mes_interface_t *intf = &g_mes_interface;
const char *dlopen_err = NULL;
intf->uc_handle = dlopen("libdbstorClient.so", RTLD_LAZY);
dlopen_err = dlerror();
if (intf->uc_handle == NULL) {
OG_LOG_RUN_WAR("Failed to load libdbstorClient.so, trying libdbstoreClient.so instead, original error: %s",
dlopen_err);
intf->uc_handle = dlopen("libdbstoreClient.so", RTLD_LAZY);
dlopen_err = dlerror();
if (intf->uc_handle == NULL) {
OG_LOG_RUN_ERR("Failed to load libdbstoreClient.so, maybe lib path error, errno %s", dlopen_err);
return OG_ERROR;
}
}
OG_RETURN_IFERR(mes_load_symbol(intf->uc_handle, "dpuc_msg_alloc", (void **)(&intf->dpuc_msg_alloc)));
OG_RETURN_IFERR(mes_load_symbol(intf->uc_handle, "dpuc_msg_free", (void **)(&intf->dpuc_msg_free)));
OG_RETURN_IFERR(mes_load_symbol(intf->uc_handle, "dpuc_msgparam_set", (void **)(&intf->dpuc_msgparam_set)));
OG_RETURN_IFERR(
mes_load_symbol(intf->uc_handle, "dpuc_msgmem_reg_integrate", (void **)(&intf->dpuc_msgmem_reg_integrate)));
OG_RETURN_IFERR(mes_load_symbol(intf->uc_handle, "dpuc_msg_send", (void **)(&intf->dpuc_msg_send)));
OG_RETURN_IFERR(mes_load_symbol(intf->uc_handle, "dpuc_msglen_get", (void **)(&intf->dpuc_msglen_get)));
OG_RETURN_IFERR(mes_load_symbol(intf->uc_handle, "dpuc_sgl_addr_set", (void **)(&intf->dpuc_sgl_addr_set)));
OG_RETURN_IFERR(mes_load_symbol(intf->uc_handle, "dpuc_sgl_addr_get", (void **)(&intf->dpuc_sgl_addr_get)));
OG_RETURN_IFERR(mes_load_symbol(intf->uc_handle, "dpuc_data_addr_get", (void **)(&intf->dpuc_data_addr_get)));
OG_RETURN_IFERR(mes_load_symbol(intf->uc_handle, "dpuc_eid_make", (void **)(&intf->dpuc_eid_make)));
OG_RETURN_IFERR(mes_load_symbol(intf->uc_handle, "dpuc_eid_reg", (void **)(&intf->dpuc_eid_reg)));
OG_RETURN_IFERR(mes_load_symbol(intf->uc_handle, "dpuc_set_src_eid_addr", (void **)(&intf->dpuc_set_src_eid_addr)));
OG_RETURN_IFERR(mes_load_symbol(intf->uc_handle, "dpuc_set_dst_eid_addr", (void **)(&intf->dpuc_set_dst_eid_addr)));
OG_RETURN_IFERR(mes_load_symbol(intf->uc_handle, "dpuc_set_sched_info", (void **)(&intf->dpuc_set_eid_reactor)));
OG_RETURN_IFERR(mes_load_symbol(intf->uc_handle, "dpuc_set_subhealth_threshold",
(void **)(&intf->dpuc_set_subhealth_threshold)));
OG_RETURN_IFERR(
mes_load_symbol(intf->uc_handle, "dpuc_process_set_config", (void **)(&intf->dpuc_process_set_config)));
OG_RETURN_IFERR(
mes_load_symbol(intf->uc_handle, "dpuc_xnet_set_process_ver", (void **)(&intf->dpuc_xnet_set_process_ver)));
OG_RETURN_IFERR(mes_load_symbol(intf->uc_handle, "dpuc_all_init", (void **)(&intf->dpuc_all_init)));
OG_RETURN_IFERR(
mes_load_symbol(intf->uc_handle, "dpuc_regist_link_event", (void **)(&intf->dpuc_regist_link_event)));
OG_RETURN_IFERR(
mes_load_symbol(intf->uc_handle, "dpuc_link_create_with_addr", (void **)(&intf->dpuc_link_create_with_addr)));
OG_RETURN_IFERR(mes_load_symbol(intf->uc_handle, "dpuc_qlink_close", (void **)(&intf->dpuc_qlink_close)));
OG_RETURN_IFERR(
mes_load_symbol(intf->uc_handle, "dpuc_set_security_cert_info", (void **)(&intf->dpuc_set_security_cert_info)));
OG_LOG_RUN_INF("load uc from libdbstorClient.so done");
return OG_SUCCESS;
}
static status_t dsw_init_lib(void)
{
mes_interface_t *intf = &g_mes_interface;
intf->dsw_handle = dlopen("libdswcore_mem.so", RTLD_LAZY);
const char *dlopen_err = NULL;
dlopen_err = dlerror();
if (intf->dsw_handle == NULL) {
OG_LOG_RUN_ERR("fail to load libdswcore_mem.so, maybe lib path error, errno %s", dlopen_err);
return OG_ERROR;
}
OG_RETURN_IFERR(mes_load_symbol(intf->dsw_handle, "dsw_core_init", (void **)(&intf->dsw_core_init)));
OG_LOG_RUN_INF("load libdswcore_mem.so done");
return OG_SUCCESS;
}
static status_t umm_init_lib(void)
{
mes_interface_t *intf = &g_mes_interface;
intf->umm_handle = dlopen("libdpumm_cmm.so", RTLD_LAZY);
const char *dlopen_err = NULL;
dlopen_err = dlerror();
if (intf->umm_handle == NULL) {
OG_LOG_RUN_ERR("fail to load libdpumm_cmm.so, maybe lib path error, errno %s", dlopen_err);
return OG_ERROR;
}
OG_RETURN_IFERR(
mes_load_symbol(intf->umm_handle, "allocMultiPagesSync", (void **)(&intf->allocate_multi_pages_sync)));
OG_RETURN_IFERR(mes_load_symbol(intf->umm_handle, "freeMultiPages", (void **)(&intf->free_multi_pages)));
OG_RETURN_IFERR(
mes_load_symbol(intf->umm_handle, "copyDataFromBufferToSgl", (void **)(&intf->copy_data_from_buf_to_sgl)));
OG_RETURN_IFERR(
mes_load_symbol(intf->umm_handle, "copyDataFromSglToBuffer", (void **)(&intf->copy_data_from_sgl_to_buf)));
OG_RETURN_IFERR(
mes_load_symbol(intf->umm_handle, "dpumm_set_config_path", (void **)(&intf->dpumm_set_config_path)));
OG_RETURN_IFERR(mes_load_symbol(intf->umm_handle, "getLastSgl", (void **)(&intf->get_last_sgl)));
OG_LOG_RUN_INF("load libdpumm_cmm.so done");
return OG_SUCCESS;
}
static status_t dplog_init_lib(void)
{
mes_interface_t *intf = &g_mes_interface;
intf->dplog_handle = dlopen("libdplog.so", RTLD_LAZY);
const char *dlopen_err = NULL;
dlopen_err = dlerror();
if (intf->dplog_handle == NULL) {
OG_LOG_RUN_ERR("fail to load libdplog.so, maybe lib path error, errno %s", dlopen_err);
return OG_ERROR;
}
OG_RETURN_IFERR(mes_load_symbol(intf->dplog_handle, "dplog_init", (void **)(&intf->dplog_init)));
OG_RETURN_IFERR(
mes_load_symbol(intf->dplog_handle, "dplog_set_backup_num", (void **)(&intf->dplog_set_backup_num)));
OG_RETURN_IFERR(
mes_load_symbol(intf->dplog_handle, "dplog_set_file_path_ext", (void **)(&intf->dplog_set_file_path_ext)));
OG_LOG_RUN_INF("load libdplog.so done");
return OG_SUCCESS;
}
static void mes_close_lib(void)
{
mes_interface_t *intf = &g_mes_interface;
if (intf->uc_handle != NULL) {
(void)dlclose(intf->uc_handle);
}
if (intf->dsw_handle != NULL) {
(void)dlclose(intf->dsw_handle);
}
if (intf->umm_handle != NULL) {
(void)dlclose(intf->umm_handle);
}
if (intf->dplog_handle != NULL) {
(void)dlclose(intf->dplog_handle);
}
}
void mes_destroy_uc(void)
{
uint32 i;
for (i = 0; i < g_mes.profile.reactor_thread_num; ++i) {
cm_thread_lock(&g_mes_uc_recv_thead[i].lock);
g_mes_uc_recv_thead[i].thread_ready = OG_FALSE;
}
mes_destory_message_pool();
for (i = 0; i < g_mes.profile.reactor_thread_num; ++i) {
cm_thread_unlock(&g_mes_uc_recv_thead[i].lock);
}
mes_close_lib();
}
static dpuc_msg *mes_uc_alloc_uc_msg(u32 msgLen)
{
dpuc_msg_alloc_param msg_param = { 0 };
msg_param.pEidObj = g_mes_uc_config.eid_obj;
msg_param.pMsgTemplate = NULL;
msg_param.uiSize = msgLen;
msg_param.ucDataType = DPUC_DATA;
msg_param.ucMsgType = DPUC_TYPE_POST;
return mes_global_handle()->dpuc_msg_alloc(&msg_param, __FUNCTION__);
}
static void mes_uc_free_uc_msg_sgl(dpuc_msg *ucMsg)
{
SGL_S *sgl = NULL;
if (ucMsg != NULL) {
sgl = mes_global_handle()->dpuc_sgl_addr_get(ucMsg, __FUNCTION__);
if (sgl != NULL) {
MES_UC_FREE_PAGES(sgl);
}
}
return;
}
static void mes_uc_free_uc_msg(dpuc_msg *ucMsg)
{
if (ucMsg != NULL) {
(void)mes_global_handle()->dpuc_msg_free(ucMsg, __FUNCTION__);
}
return;
}
static void mes_modify_last_entry_len(SGL_S *sgl, int len)
{
SGL_S *lastSgl = NULL;
uint32_t entryIdx = 0;
SGL_ENTRY_S *entry = NULL;
mes_global_handle()->get_last_sgl(sgl, &lastSgl, &entryIdx);
entry = &(lastSgl->entrys[entryIdx]);
entry->len = len;
return;
}
static status_t mes_uc_add_data_to_msg(dpuc_msg *mes_uc_msg, mes_message_head_t *head)
{
SGL_S *sgl = NULL;
uint32_t page_num;
page_num = (head->size + (MES_UC_BYTE_PER_PAGE_PI - 1)) / MES_UC_BYTE_PER_PAGE_PI;
MES_UC_ALLOC_PAGES_SYNC(page_num, &sgl);
if (sgl == NULL) {
MES_LOGGING(MES_LOGGING_SEND, "mes alloc sgl failed, page num %u", page_num);
return OG_ERROR;
}
if (mes_global_handle()->copy_data_from_buf_to_sgl(sgl, 0, (char *)head, head->size) != RETURN_OK) {
MES_LOGGING(MES_LOGGING_SEND, "mes copy data to sgl failed, page num %u, size %u", page_num, head->size);
MES_UC_FREE_PAGES(sgl);
return OG_ERROR;
}
mes_modify_last_entry_len(sgl, (head->size - (page_num - 1) * MES_UC_BYTE_PER_PAGE_PI));
if (mes_global_handle()->dpuc_sgl_addr_set(mes_uc_msg, sgl, head->size, __FUNCTION__) != DP_OK) {
MES_LOGGING(MES_LOGGING_SEND, "mes set sgl to uc failed, page num %u", page_num);
MES_UC_FREE_PAGES(sgl);
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t mes_uc_add_buf_list_to_msg(dpuc_msg *mes_uc_msg, mes_message_head_t *head, mes_bufflist_t *buff_list)
{
SGL_S *sgl = NULL;
uint32_t page_num;
uint32_t index;
uint32_t sgl_offset = 0;
page_num = (head->size + (MES_UC_BYTE_PER_PAGE_PI - 1)) / MES_UC_BYTE_PER_PAGE_PI;
MES_UC_ALLOC_PAGES_SYNC(page_num, &sgl);
if (sgl == NULL) {
MES_LOGGING(MES_LOGGING_SEND, "mes alloc sgl failed, page num %u", page_num);
return OG_ERROR;
}
for (index = 0; index < buff_list->cnt; index++) {
if (mes_global_handle()->copy_data_from_buf_to_sgl(sgl, sgl_offset, buff_list->buffers[index].buf,
buff_list->buffers[index].len) != RETURN_OK) {
OG_LOG_RUN_ERR("mes copy data to sgl failed, page num %u, size %u, index %u", page_num, head->size, index);
MES_UC_FREE_PAGES(sgl);
return OG_ERROR;
}
sgl_offset += buff_list->buffers[index].len;
}
mes_modify_last_entry_len(sgl, (head->size - (page_num - 1) * MES_UC_BYTE_PER_PAGE_PI));
if (mes_global_handle()->dpuc_sgl_addr_set(mes_uc_msg, sgl, head->size, __FUNCTION__) != DP_OK) {
MES_LOGGING(MES_LOGGING_SEND, "mes set sgl to uc failed, page num %u", page_num);
MES_UC_FREE_PAGES(sgl);
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t mes_uc_add_buf_list_to_msg_head(dpuc_msg *mes_uc_msg, mes_message_head_t *head,
mes_bufflist_t *buff_list)
{
uint32_t index;
errno_t err;
uint32_t data_offset = 0;
char *user_data = NULL;
user_data = (char *)mes_global_handle()->dpuc_data_addr_get(mes_uc_msg, __FUNCTION__);
if (user_data == NULL) {
MES_LOGGING(MES_LOGGING_SEND, "mes get dpuc data addr failed");
return OG_ERROR;
}
for (index = 0; index < buff_list->cnt; index++) {
err = memcpy_sp(user_data + data_offset, MES_UC_MAX_USER_DATA_LEN - data_offset, buff_list->buffers[index].buf,
buff_list->buffers[index].len);
MEMS_RETURN_IFERR(err);
data_offset += buff_list->buffers[index].len;
}
return OG_SUCCESS;
}
static int32_t mes_uc_send_msg_ack_callback(int32_t result, dpuc_msg_param_s *msg_param, void *context)
{
if (result != RETURN_OK) {
MES_LOGGING(MES_LOGGING_SEND, "mes uc send failed, ret %d, eid(0x%lx -> 0x%lx) opcode %u", result,
msg_param->sendEid, msg_param->recvEid, msg_param->uiOpcode);
}
if (msg_param == NULL) {
MES_LOGGING(MES_LOGGING_SEND, "mes uc ack param is null, ret %d", result);
return RETURN_ERROR;
}
if (msg_param->pMsg == NULL) {
MES_LOGGING(MES_LOGGING_SEND, "mes uc ack param msg is null, ret %d", result);
return RETURN_ERROR;
}
mes_uc_send_context *pContext = (mes_uc_send_context *)context;
mes_consume_with_time(pContext->cmd, MES_TIME_TEST_SEND_ACK, pContext->start_time);
if (pContext != NULL) {
free(pContext);
}
mes_uc_free_uc_msg_sgl(msg_param->pMsg);
mes_uc_free_uc_msg(msg_param->pMsg);
return RETURN_OK;
}
static inline void mes_uc_free_mem(mes_uc_send_context *context, dpuc_msg *msg)
{
free(context);
mes_uc_free_uc_msg_sgl(msg);
mes_uc_free_uc_msg(msg);
}
static status_t mes_uc_alloc_msg(mes_message_head_t *head, dpuc_msg **mes_uc_msg)
{
errno_t err;
char *user_data = NULL;
if (head->size <= MES_UC_MAX_USER_DATA_LEN) {
*mes_uc_msg = mes_uc_alloc_uc_msg(head->size);
if (*mes_uc_msg == NULL) {
OG_LOG_RUN_ERR("mes alloc uc msg failed, cmd %u", head->cmd);
return OG_ERROR;
}
user_data = (char *)mes_global_handle()->dpuc_data_addr_get(*mes_uc_msg, __FUNCTION__);
if (user_data == NULL) {
OG_LOG_RUN_ERR("mes get data addr failed, cmd %u", head->cmd);
return OG_ERROR;
}
err = memcpy_sp(user_data, MES_UC_MAX_USER_DATA_LEN, head, head->size);
MEMS_RETURN_IFERR(err);
} else {
*mes_uc_msg = mes_uc_alloc_uc_msg(0);
if (*mes_uc_msg == NULL) {
OG_LOG_RUN_ERR("mes alloc uc msg failed, cmd %u", head->cmd);
return OG_ERROR;
}
if (mes_uc_add_data_to_msg(*mes_uc_msg, head) != OG_SUCCESS) {
MES_LOGGING(MES_LOGGING_SEND, "mes add data to msg failed, cmd %u", head->cmd);
mes_uc_free_uc_msg(*mes_uc_msg);
return OG_ERROR;
}
}
return OG_SUCCESS;
}
status_t mes_uc_send_data(const void *msg_data)
{
int32_t ret;
uint64 stat_time = 0;
uint64 start_ack_time = 0;
mes_message_head_t *head = (mes_message_head_t *)msg_data;
dpuc_msg *mes_uc_msg = NULL;
uint8 dst_inst = head->dst_inst;
mes_uc_send_context *pContext = NULL;
if (mes_uc_connection_ready(dst_inst) != OG_TRUE) {
MES_LOGGING_WAR(MES_LOGGING_SEND,
"uc connection from %u to %u is not ready, cmd=%u, rsn=%u, src_sid=%u,"
"dst_sid=%u",
head->src_inst, head->dst_inst, head->cmd, head->rsn, head->src_sid, head->dst_sid);
return OG_ERROR;
}
mes_get_consume_time_start(&stat_time);
OG_RETURN_IFERR(mes_uc_alloc_msg(head, &mes_uc_msg));
ret = mes_global_handle()->dpuc_msgparam_set(mes_uc_msg, g_mes_uc_config.eid, g_mes_uc_config.dst_eid[dst_inst],
head->cmd, __FUNCTION__);
if (ret != DP_OK) {
MES_LOGGING(MES_LOGGING_SEND, "mes set uc msg param failed, src_eid 0x%lx, dst_eid 0x%lx, cmd %u",
g_mes_uc_config.eid, g_mes_uc_config.dst_eid[dst_inst], head->cmd);
mes_uc_free_uc_msg(mes_uc_msg);
return OG_ERROR;
}
pContext = (mes_uc_send_context *)malloc(sizeof(mes_uc_send_context));
if (pContext == NULL) {
MES_LOGGING(MES_LOGGING_SEND, "mes set uc send context failed, src_eid 0x%lx, dst_eid 0x%lx, cmd %u",
g_mes_uc_config.eid, g_mes_uc_config.dst_eid[dst_inst], head->cmd);
return OG_ERROR;
}
pContext->cmd = head->cmd;
mes_get_consume_time_start(&start_ack_time);
pContext->start_time = start_ack_time;
ret = mes_global_handle()->dpuc_msg_send(mes_uc_msg, mes_uc_send_msg_ack_callback, pContext, __FUNCTION__);
if (ret != DP_OK) {
MES_LOGGING(MES_LOGGING_SEND, "mes send post msg failed, src_eid 0x%lx, dst_eid 0x%lx, cmd %u, ret %d",
g_mes_uc_config.eid, g_mes_uc_config.dst_eid[head->dst_inst], head->cmd, ret);
mes_uc_free_mem(pContext, mes_uc_msg);
return OG_ERROR;
}
mes_consume_with_time(head->cmd, MES_TIME_SEND_IO, stat_time);
return OG_SUCCESS;
}
static status_t mes_uc_alloc_buff_msg(mes_bufflist_t *buff_list, mes_message_head_t *head, dpuc_msg **mes_uc_msg)
{
if (head->size <= MES_UC_MAX_USER_DATA_LEN) {
*mes_uc_msg = mes_uc_alloc_uc_msg(head->size);
if (*mes_uc_msg == NULL) {
OG_LOG_RUN_ERR("mes alloc uc msg failed, cmd %u", head->cmd);
return OG_ERROR;
}
if (mes_uc_add_buf_list_to_msg_head(*mes_uc_msg, head, buff_list) != OG_SUCCESS) {
OG_LOG_RUN_ERR("mes add bufflist to msg head failed, cmd %u", head->cmd);
mes_uc_free_uc_msg(*mes_uc_msg);
return OG_ERROR;
}
} else {
*mes_uc_msg = mes_uc_alloc_uc_msg(0);
if (*mes_uc_msg == NULL) {
OG_LOG_RUN_ERR("mes alloc uc msg failed, cmd %u", head->cmd);
return OG_ERROR;
}
if (mes_uc_add_buf_list_to_msg(*mes_uc_msg, head, buff_list) != OG_SUCCESS) {
OG_LOG_RUN_ERR("mes add bufflist to msg failed, cmd %u", head->cmd);
mes_uc_free_uc_msg(*mes_uc_msg);
return OG_ERROR;
}
}
return OG_SUCCESS;
}
status_t mes_uc_send_bufflist(mes_bufflist_t *buff_list)
{
int32_t ret;
uint64 stat_time = 0;
uint64 start_ack_time = 0;
mes_message_head_t *head = (mes_message_head_t *)(buff_list->buffers[0].buf);
dpuc_msg *mes_uc_msg = NULL;
uint8 dst_inst = head->dst_inst;
mes_uc_send_context *pContext = NULL;
if (mes_uc_connection_ready(dst_inst) != OG_TRUE) {
MES_LOGGING_WAR(MES_LOGGING_SEND,
"uc connection from %u to %u is not ready, cmd=%u, rsn=%u, src_sid=%u,"
"dst_sid=%u",
head->src_inst, head->dst_inst, head->cmd, head->rsn, head->src_sid, head->dst_sid);
return OG_ERROR;
}
mes_get_consume_time_start(&stat_time);
OG_RETURN_IFERR(mes_uc_alloc_buff_msg(buff_list, head, &mes_uc_msg));
ret = mes_global_handle()->dpuc_msgparam_set(mes_uc_msg, g_mes_uc_config.eid, g_mes_uc_config.dst_eid[dst_inst],
head->cmd, __FUNCTION__);
if (ret != DP_OK) {
MES_LOGGING(MES_LOGGING_SEND, "mes set uc msg param failed, src_eid 0x%lx, dst_eid 0x%lx, cmd %u",
g_mes_uc_config.eid, g_mes_uc_config.dst_eid[dst_inst], head->cmd);
mes_uc_free_uc_msg(mes_uc_msg);
return OG_ERROR;
}
pContext = (mes_uc_send_context *)malloc(sizeof(mes_uc_send_context));
if (pContext == NULL) {
MES_LOGGING(MES_LOGGING_SEND, "mes set uc send context failed, src_eid 0x%lx, dst_eid 0x%lx, cmd %u",
g_mes_uc_config.eid, g_mes_uc_config.dst_eid[dst_inst], head->cmd);
return OG_ERROR;
}
pContext->cmd = head->cmd;
mes_get_consume_time_start(&start_ack_time);
pContext->start_time = start_ack_time;
ret = mes_global_handle()->dpuc_msg_send(mes_uc_msg, mes_uc_send_msg_ack_callback, pContext, __FUNCTION__);
if (ret != DP_OK) {
MES_LOGGING(MES_LOGGING_SEND, "mes send post msg failed, src_eid 0x%lx, dst_eid 0x%lx, cmd %u, ret %d",
g_mes_uc_config.eid, g_mes_uc_config.dst_eid[head->dst_inst], head->cmd, ret);
mes_uc_free_mem(pContext, mes_uc_msg);
return OG_ERROR;
}
mes_consume_with_time(head->cmd, MES_TIME_SEND_IO, stat_time);
return OG_SUCCESS;
}
static status_t mes_uc_get_mes_msg_from_uc_head(dpuc_msg *uc_msg, mes_message_t *mes_msg)
{
errno_t err;
char *user_data = NULL;
mes_message_head_t *head = NULL;
user_data = (char *)mes_global_handle()->dpuc_data_addr_get(uc_msg, __FUNCTION__);
if (user_data == NULL) {
MES_LOGGING(MES_LOGGING_SEND, "mes recv uc msg head failed");
return OG_ERROR;
}
head = (mes_message_head_t *)user_data;
if (mes_check_msg_head(head) != OG_SUCCESS) {
OG_THROW_ERROR_EX(ERR_MES_ILEGAL_MESSAGE,
"mes message length=%u, cmd=%u, rsn=%u, src_inst=%u, dst_inst=%u, "
"src_sid=%u, dst_sid=%u, thead id=%d",
head->size, head->cmd, head->rsn, head->src_inst, head->dst_inst, head->src_sid,
head->dst_sid, g_thread_queue_id);
return OG_ERROR;
}
mes_get_message_buf(mes_msg, head);
if ((mes_msg->buffer == NULL) || (mes_msg->head == NULL)) {
MES_LOGGING(MES_LOGGING_SEND, "mes get msg buf failed");
return OG_ERROR;
}
err = memcpy_s(mes_msg->buffer, head->size, head, head->size);
MEMS_RETURN_IFERR(err);
return OG_SUCCESS;
}
static status_t mes_uc_get_mes_msg_from_uc_msg(dpuc_msg *uc_msg, mes_message_t *mes_msg)
{
SGL_S *uc_msg_data = NULL;
mes_message_head_t *head = NULL;
uc_msg_data = mes_global_handle()->dpuc_sgl_addr_get(uc_msg, __FUNCTION__);
if ((uc_msg_data == NULL) || (uc_msg_data->entrys[0].len < sizeof(mes_message_head_t))) {
mes_uc_free_uc_msg_sgl(uc_msg);
MES_LOGGING(MES_LOGGING_SEND, "mes recv uc msg sgl failed");
return OG_ERROR;
}
head = (mes_message_head_t *)uc_msg_data->entrys[0].buf;
if (mes_check_msg_head(head) != OG_SUCCESS) {
mes_uc_free_uc_msg_sgl(uc_msg);
OG_THROW_ERROR_EX(ERR_MES_ILEGAL_MESSAGE,
"mes message length=%u, cmd=%u, rsn=%u, src_inst=%u, dst_inst=%u, "
"src_sid=%u, dst_sid=%u, thead id=%d",
head->size, head->cmd, head->rsn, head->src_inst, head->dst_inst, head->src_sid,
head->dst_sid, g_thread_queue_id);
return OG_ERROR;
}
mes_get_message_buf(mes_msg, head);
if ((mes_msg->buffer == NULL) || (mes_msg->head == NULL)) {
MES_LOGGING(MES_LOGGING_SEND, "mes get msg buf failed");
mes_uc_free_uc_msg_sgl(uc_msg);
return OG_ERROR;
}
if (mes_global_handle()->copy_data_from_sgl_to_buf(uc_msg_data, 0, mes_msg->buffer, head->size) != RETURN_OK) {
MES_LOGGING(MES_LOGGING_SEND, "mes copy data from sgl to buf failed, size %u", head->size);
mes_free_buf_item(mes_msg->buffer);
mes_uc_free_uc_msg_sgl(uc_msg);
return OG_ERROR;
}
mes_uc_free_uc_msg_sgl(uc_msg);
return OG_SUCCESS;
}
static void mes_get_thread_id(void)
{
if (g_thread_queue_id == OG_INVALID_ID8) {
cm_spin_lock(&g_thread_queue_id_lock, NULL);
g_thread_queue_id = g_thread_id % OG_MES_MAX_REACTOR_THREAD_NUM;
g_thread_id++;
cm_spin_unlock(&g_thread_queue_id_lock);
OG_LOG_DEBUG_INF("set thread queue id = %d.", g_thread_queue_id);
}
}
static int32_t mes_uc_msg_recv_func(dpuc_msg *uc_msg, dpuc_msg_mem_free_mode_e *freeMode)
{
mes_message_t mes_msg = { NULL, NULL };
uint64 stat_time = 0;
status_t ret = OG_ERROR;
mes_get_consume_time_start(&stat_time);
if (uc_msg == NULL) {
MES_LOGGING(MES_LOGGING_SEND, "mes recv uc msg is invalid");
return RETURN_ERROR;
}
if (freeMode != NULL) {
*freeMode = DPUC_AUTO_FREE;
}
mes_get_thread_id();
cm_thread_lock(&g_mes_uc_recv_thead[g_thread_queue_id].lock);
if (!g_mes_uc_recv_thead[g_thread_queue_id].thread_ready) {
MES_LOGGING(MES_LOGGING_SEND, "get mes msg failed, recv thread is not ready");
cm_thread_unlock(&g_mes_uc_recv_thead[g_thread_queue_id].lock);
return RETURN_ERROR;
}
uint32 uc_msg_len = mes_global_handle()->dpuc_msglen_get(uc_msg, __FUNCTION__);
if (uc_msg_len >= sizeof(mes_message_head_t)) {
ret = mes_uc_get_mes_msg_from_uc_head(uc_msg, &mes_msg);
} else if (uc_msg_len == 0) {
ret = mes_uc_get_mes_msg_from_uc_msg(uc_msg, &mes_msg);
} else {
OG_LOG_RUN_ERR("[mes] mes uc recv message len is invalid");
}
if (ret != OG_SUCCESS) {
MES_LOGGING(MES_LOGGING_RECV, "get mes msg from uc failed");
cm_thread_unlock(&g_mes_uc_recv_thead[g_thread_queue_id].lock);
return RETURN_ERROR;
}
if (g_mes_uc_channel_status[mes_msg.head->src_inst].is_allow_msg_transfer != OG_TRUE) {
MES_LOGGING(MES_LOGGING_RECV, "mes not allow msg transfer, src_inst=%u.", mes_msg.head->src_inst);
cm_thread_unlock(&g_mes_uc_recv_thead[g_thread_queue_id].lock);
return RETURN_ERROR;
}
mes_consume_with_time(mes_msg.head->cmd, MES_TIME_READ_MES, stat_time);
if (g_mes.crc_check_switch) {
if (mes_message_vertify_cks(&mes_msg) != OG_SUCCESS) {
cm_thread_unlock(&g_mes_uc_recv_thead[g_thread_queue_id].lock);
OG_LOG_RUN_ERR("[mes] check cks failed, cmd=%u, rsn=%u, src_inst=%u, dst_inst=%u", mes_msg.head->cmd,
mes_msg.head->rsn, mes_msg.head->src_inst, mes_msg.head->dst_inst);
return RETURN_ERROR;
}
}
mes_process_message(&g_mes_uc_recv_thead[g_thread_queue_id].msg_queue, 0, &mes_msg, stat_time);
cm_thread_unlock(&g_mes_uc_recv_thead[g_thread_queue_id].lock);
return RETURN_OK;
}
static int32_t init_dpuc_log(char *running_log_path)
{
int32_t ret = DP_ERROR;
OG_RETURN_IFERR(dplog_init_lib());
ret = mes_global_handle()->dplog_init();
if (ret != DP_OK) {
OG_LOG_RUN_ERR("Init dplog failed (%d).", ret);
return ret;
}
OG_LOG_RUN_INF("Init dplog success.");
ret = mes_global_handle()->dplog_set_file_path_ext((char *)running_log_path, (char *)running_log_path);
if (ret != DP_OK) {
OG_LOG_RUN_ERR("Set dplog path failed(%d).", ret);
return ret;
}
OG_LOG_RUN_INF("Set dplog path success.");
ret = mes_global_handle()->dplog_set_backup_num(DPLOG_MAX_NUM);
if (ret != DP_OK) {
OG_LOG_RUN_ERR("Set dplog backup num failed(%d).", ret);
return ret;
}
OG_LOG_RUN_INF("Set dplog backup num success.");
return DP_OK;
}
static uint8 lsid_to_inst_id(u32 uiDstlsId)
{
uint8 i;
for (i = 0; i < g_mes.profile.inst_count; i++) {
if (uiDstlsId == g_mes.profile.inst_lsid[i]) {
return i;
}
}
return OG_INVALID_ID8;
}
static int32_t create_link_callback(u32 uiDstlsId, dpuc_qlink_event qlinkEvent, dpuc_plane_type_e planeType,
dupc_qlink_cause_t qlinkCause)
{
OG_LOG_DEBUG_INF("Destination link (0x%x), status (%d), plane type (%d), reason (%d).", uiDstlsId, qlinkEvent,
planeType, qlinkCause);
uint8 inst_id = lsid_to_inst_id(uiDstlsId);
if (inst_id == OG_INVALID_ID8) {
OG_LOG_RUN_ERR("Not find valid inst id");
return DP_FAIL;
}
OG_LOG_RUN_INF("UC link %s, dst lsid=(0x%x)", (qlinkEvent == DPUC_QLINK_UP ? "up" : "down"), uiDstlsId);
if (inst_id == g_mes.profile.inst_id) {
OG_LOG_RUN_INF("other inst create link to own inst");
return DP_OK;
}
mes_uc_conn_t *conn = &g_mes_uc_channel_status[inst_id];
mes_channel_stat_t pre_status = conn->uc_channel_state;
cm_thread_lock(&conn->lock);
if (qlinkEvent == DPUC_QLINK_UP) {
conn->uc_channel_state = MES_CHANNEL_CONNECTED;
OG_LOG_RUN_INF("channel status covert to CONNECTED.");
} else if ((qlinkEvent == DPUC_QLINK_DOWN) && (conn->uc_channel_state == MES_CHANNEL_CONNECTED)) {
conn->uc_channel_state = MES_CHANNEL_UNCONNECTED;
OG_LOG_RUN_INF("channel status covert to UNCONNECTED.");
rc_bitmap64_set(&g_channel_reconn_bits, inst_id);
cm_release_cond_signal(&g_reconn_thread_cond);
OG_LOG_RUN_INF("cm realse cond signal success.");
} else {
OG_LOG_RUN_WAR("qlinkEvent is invalid");
}
cm_thread_unlock(&conn->lock);
OG_LOG_RUN_INF("UC link %s, dst lsid=(0x%x), inst id=%d, pre_status=%d, cur_status=%d",
(qlinkEvent == DPUC_QLINK_UP ? "up" : "down"), uiDstlsId, inst_id, pre_status,
conn->uc_channel_state);
return DP_OK;
}
static int32_t link_state_change_callback(u32 uiDstlsId, dpuc_link_state_event_t qlinkEvent,
dpuc_plane_type_e planeType, void *param)
{
if ((qlinkEvent == DPUC_LINK_EVENT_IO_FINISH)) {
cm_release_cond_signal(&g_uc_io_finish_cond);
return DP_OK;
}
if ((qlinkEvent != DPUC_LINK_STATE_EVENT_SUBHEALTH_ORIGIN) &&
(qlinkEvent != DPUC_LINK_STATE_EVENT_SUBHEALTH_CLEAR)) {
OG_LOG_RUN_WAR("link state change, qlinkEvent (%d), dst lsid (0x%x)", qlinkEvent, uiDstlsId);
return DP_OK;
}
uint8 inst_id = lsid_to_inst_id(uiDstlsId);
if (inst_id == OG_INVALID_ID8) {
OG_LOG_RUN_ERR("Not find valid inst id");
return DP_FAIL;
}
if (inst_id == g_mes.profile.inst_id) {
OG_LOG_RUN_INF("link state change, dst inst is own inst.");
return DP_OK;
}
mes_uc_conn_t *conn = &g_mes_uc_channel_status[inst_id];
conn->uc_channel_state = (qlinkEvent == DPUC_LINK_STATE_EVENT_SUBHEALTH_ORIGIN ? MES_CHANNEL_SUBHEALTH
: MES_CHANNEL_CONNECTED);
if (param == NULL) {
OG_LOG_RUN_ERR("link state change, param is null.");
return DP_FAIL;
}
dpuc_subhealth_info_t *subhealth_info = (dpuc_subhealth_info_t *)param;
OG_LOG_RUN_WAR("slow event %s, dst lsid (0x%x), local ip (%s), remote ip (%s)",
(qlinkEvent == DPUC_LINK_STATE_EVENT_SUBHEALTH_ORIGIN ? "occur" : "recover"), uiDstlsId,
subhealth_info->local_ip, subhealth_info->remote_ip);
return DP_OK;
}
static int32_t create_and_reg_eid(mes_uc_config_t *uc_config, dpuc_msg_recv_s *msg_recv_func)
{
int32_t ret = DP_ERROR;
ret = mes_global_handle()->dpuc_eid_make(NORMAL_TYPE, MY_PID, 0, uc_config->lsid, &uc_config->eid, __FUNCTION__);
if (ret != DP_OK) {
OG_LOG_RUN_ERR("Generate eid failed.");
return ret;
}
OG_LOG_DEBUG_INF("Generate eid success.");
ret = mes_global_handle()->dpuc_eid_reg(uc_config->com_mgr, uc_config->eid, msg_recv_func, &uc_config->eid_obj,
__FUNCTION__);
if (ret != DP_OK) {
OG_LOG_RUN_ERR("Reg src eid failed, eid=0x%lx.", uc_config->eid);
return ret;
}
OG_LOG_DEBUG_INF("Reg src eid success, eid=0x%lx.", uc_config->eid);
dpucLinkEventOps link_event = { create_link_callback, link_state_change_callback, NULL };
ret = mes_global_handle()->dpuc_regist_link_event(uc_config->eid, &link_event, __FUNCTION__);
if (ret != DP_OK) {
OG_LOG_RUN_ERR("Reg create link status func failed.");
return ret;
}
OG_LOG_DEBUG_INF("Reg create link status func successs");
return ret;
}
static int32_t get_cpu_id(uint32_t *cpu_id, uint32 *cpu_amount)
{
int start = 0;
int end = 0;
char *cpu_info = get_g_mes_cpu_info();
char cpu_info_copy[OG_MES_MAX_CPU_STR] = { 0 };
char *next_token = NULL;
*cpu_amount = 0;
int32_t cpu_i = 0;
PRTS_RETURN_IFERR(memcpy_s(cpu_info_copy, OG_MES_MAX_CPU_STR, cpu_info, OG_MES_MAX_CPU_STR));
char *token = strtok_s(cpu_info_copy, ",", &next_token);
while (token != NULL) {
if (sscanf_s(token, "%d-%d", &start, &end) == 2) {
for (cpu_i = start; cpu_i <= end; cpu_i++) {
if (*cpu_amount >= OG_MES_MAX_REACTOR_THREAD_NUM) {
OG_LOG_RUN_ERR("CPU Core Limit Exceeded. MES_CPU_INFO can contain a maximum of %u cores",
OG_MES_MAX_REACTOR_THREAD_NUM);
return DP_ERROR;
}
cpu_id[*cpu_amount] = cpu_i;
(*cpu_amount)++;
}
} else if (sscanf_s(token, "%d", &start) == 1) {
if (*cpu_amount >= OG_MES_MAX_REACTOR_THREAD_NUM) {
OG_LOG_RUN_ERR("CPU Core Limit Exceeded. MES_CPU_INFO can contain a maximum of %u cores",
OG_MES_MAX_REACTOR_THREAD_NUM);
return DP_ERROR;
}
cpu_id[*cpu_amount] = start;
(*cpu_amount)++;
} else {
OG_LOG_RUN_ERR("MES_CPU_INFO contains unresolvable content: %s, MES_CPU_INFO:%s", token, cpu_info_copy);
return DP_ERROR;
}
token = strtok_s(NULL, ",", &next_token);
}
return DP_OK;
}
static void check_set_cpu_affinity(void)
{
if (g_mes.profile.set_cpu_affinity != OG_TRUE) {
OG_LOG_RUN_INF("No need to set CPU affinity.");
return;
}
#if !defined(__arm__) && !defined(__aarch64__)
g_mes.profile.set_cpu_affinity = OG_FALSE;
OG_LOG_RUN_INF("No need to set CPU affinity in non-ARM environments");
return;
#endif
if (g_mes.profile.pipe_type != CS_TYPE_UC_RDMA) {
OG_LOG_RUN_INF("No core binding if the link type is not RDMA");
g_mes.profile.set_cpu_affinity = OG_FALSE;
return;
}
char *mes_cpu_info = get_g_mes_cpu_info();
if (mes_cpu_info[0] == '\0') {
g_mes.profile.set_cpu_affinity = OG_FALSE;
OG_LOG_RUN_INF("Due to the lack of CPU info, MES will not set CPU affinity");
return;
}
OG_LOG_RUN_INF("CPU affinity can be configured");
}
static int32_t create_reactor(mes_uc_config_t *uc_config)
{
int32_t ret = DP_OK;
uint32_t threadNum = g_mes.profile.reactor_thread_num;
OG_LOG_DEBUG_INF("Set reacot thread num = %d.", threadNum);
if (threadNum > OG_MES_MAX_REACTOR_THREAD_NUM) {
OG_LOG_RUN_ERR("reator threadNum is excced, threadNum = %d", threadNum);
return DP_ERROR;
}
dpuc_xnet_thread_info_s threadInfo[OG_MES_MAX_REACTOR_THREAD_NUM];
uint32_t i;
uint32_t cpu_amount = 0;
uint32_t xnet_cpu_id[OG_MES_MAX_REACTOR_THREAD_NUM];
check_set_cpu_affinity();
if (g_mes.profile.set_cpu_affinity == OG_TRUE) {
int32_t ret_get_cpu_id = get_cpu_id(xnet_cpu_id, &cpu_amount);
if (ret_get_cpu_id != DP_OK) {
OG_LOG_RUN_ERR("MES failed to parse CPU core binding information. ret: %d; cpu_amount: %u; threadNum %u.",
ret_get_cpu_id, cpu_amount, threadNum);
return DP_ERROR;
}
}
if (g_mes.profile.set_cpu_affinity == OG_TRUE) {
threadNum = cpu_amount;
OG_LOG_RUN_INF("change xnet reactor threadNum to %u.", cpu_amount);
}
for (i = 0; i < threadNum; i++) {
threadInfo[i].pri = 0;
CPU_ZERO(&(threadInfo[i].cpu_set));
if (g_mes.profile.set_cpu_affinity != OG_TRUE) {
OG_LOG_RUN_INF("NOT set xnet reactor CPU Affinity, reactor_id: %u.", i);
continue;
}
OG_LOG_RUN_INF("Set xnet reactor CPU Affinity, reactor_id: %u, cpu_id: %u.", i, xnet_cpu_id[i]);
CPU_SET(xnet_cpu_id[i], &(threadInfo[i].cpu_set));
}
dpuc_sched_conf_info_s cfgInfo = { g_mes.profile.set_cpu_affinity, g_mes.profile.set_cpu_affinity, threadInfo,
threadNum };
ret = mes_global_handle()->dpuc_set_eid_reactor(uc_config->eid_obj, "mes_cfg_xnet", &cfgInfo, __FUNCTION__);
if (ret != DP_OK) {
OG_LOG_RUN_ERR("Generate reactor failed (%d).", ret);
return ret;
}
OG_LOG_DEBUG_INF("Generate reactor success.");
return ret;
}
static int32_t set_subhealth_threshold(void)
{
if (g_mes.profile.upgrade_time_ms == 0 || g_mes.profile.degrade_time_ms == 0) {
OG_LOG_RUN_INF("Use default subhealth threshold, upgrade time (%u ms), degrade time (%u ms).",
g_mes.profile.upgrade_time_ms, g_mes.profile.degrade_time_ms);
return DP_OK;
}
dpuc_subhealth_threshold subhealth_threshold = { 0 };
subhealth_threshold.type = DPUC_XNET_TCP;
subhealth_threshold.plane = DPUC_DATA_PLANE;
subhealth_threshold.hop = 0;
subhealth_threshold.upgradeTimeNs = (uint64_t)g_mes.profile.upgrade_time_ms * NS_PER_MS;
subhealth_threshold.degradeTimeNs = (uint64_t)g_mes.profile.degrade_time_ms * NS_PER_MS;
int32_t ret = mes_global_handle()->dpuc_set_subhealth_threshold(subhealth_threshold, __FUNCTION__);
if (ret != DP_OK) {
OG_LOG_RUN_ERR("Set subhealth threshhold failed (%d).", ret);
return ret;
}
OG_LOG_RUN_INF("Set subhealth threshhold success, upgrade time (%lu ns), degrade time (%lu ns).",
subhealth_threshold.upgradeTimeNs, subhealth_threshold.degradeTimeNs);
return DP_OK;
}
status_t mes_uc_set_process_config(void)
{
dpuc_necessary_config_param_t dpuc_config_para = { 1024, 64, 0, 10240, 10240, 1024, 1024, 1024, 1024 };
int32 ret = mes_global_handle()->dpuc_process_set_config(&dpuc_config_para, NULL, __FUNCTION__);
if (ret == DP_OK || ret == DPUC_RESULT_PARAM_REPEAT_SET) {
return OG_SUCCESS;
}
OG_LOG_RUN_ERR("Set dpuc process config failed (%d).", ret);
return OG_ERROR;
}
static int32_t mes_uc_decode_kmc_pwd(char *pass, uint32 pass_len, char *plain, uint32_t max_key_len, uint32 *plain_len)
{
if (EVP_DecodeBlock((uchar *)plain, (uchar *)pass, pass_len) == OG_ERROR) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static int32_t mes_uc_get_crt_file_path(uint32_t inst_id, char *pub_key_file, uint32_t *pub_key_file_len)
{
if (inst_id >= OG_MAX_INSTANCES) {
OG_LOG_RUN_ERR("[mes]: crt inst_id is invalid %u", inst_id);
return OG_ERROR;
}
PRTS_RETURN_IFERR(snprintf_s(pub_key_file, DPUC_MAX_FILE_NAME_LEN, DPUC_MAX_FILE_NAME_LEN, "%s/mes.crt",
mes_get_ssl_auth_file()->cert_dir));
*pub_key_file_len = DPUC_MAX_FILE_NAME_LEN;
return OG_SUCCESS;
}
static int32_t init_xnet_dpuc(mes_uc_config_t *uc_config)
{
OG_RETURN_IFERR(uc_init_lib());
OG_RETURN_IFERR(dsw_init_lib());
OG_RETURN_IFERR(umm_init_lib());
int32_t ret = DP_ERROR;
dpuc_msg_recv_s msg_recv_func = { mes_uc_msg_recv_func, NULL };
dpuc_comm_mgr_param commMgrParam = { 2048, 2048, MY_PID, uc_config->lsid, 0 };
if (!g_enable_dbstor) {
ret = mes_global_handle()->dpumm_set_config_path("/home/regress/oGRACKernel/pkg/test/mes_test/", __FUNCTION__,
__LINE__);
} else {
ret = mes_global_handle()->dpumm_set_config_path(g_mes.profile.dpumm_config_path, __FUNCTION__, __LINE__);
}
if (ret != DP_OK) {
OG_LOG_RUN_ERR("umm set failed (%d).", ret);
return DP_ERROR;
}
OG_LOG_DEBUG_INF("umm set success.");
if (mes_global_handle()->dsw_core_init(NULL, 0, NULL) != 0) {
OG_LOG_RUN_ERR("[mes] dsw_core_init failed");
return OG_ERROR;
}
if (g_mes.profile.channel_version != OG_INVALID_ID64) {
(void)mes_global_handle()->dpuc_xnet_set_process_ver(g_mes.profile.channel_version);
OG_LOG_RUN_INF("mes set channel version=%lld.", g_mes.profile.channel_version);
}
OG_RETURN_IFERR(mes_uc_set_process_config());
dpuc_security_cert_info_t dpuc_link_cert_info;
dpuc_link_cert_info.security_cert_switch = g_mes.profile.use_ssl;
dpuc_link_cert_info.user_id = (uint32_t)g_mes.profile.inst_id;
PRTS_RETURN_IFERR(snprintf_s(dpuc_link_cert_info.pri_key_file, DPUC_MAX_FILE_NAME_LEN, DPUC_MAX_FILE_NAME_LEN,
mes_get_ssl_auth_file()->key_file));
PRTS_RETURN_IFERR(snprintf_s(dpuc_link_cert_info.pub_key_file, DPUC_MAX_FILE_NAME_LEN, DPUC_MAX_FILE_NAME_LEN,
mes_get_ssl_auth_file()->cert_file));
PRTS_RETURN_IFERR(snprintf_s(dpuc_link_cert_info.pri_key_pass_file, DPUC_MAX_FILE_NAME_LEN, DPUC_MAX_FILE_NAME_LEN,
mes_get_ssl_auth_file()->pass_file));
dpuc_link_cert_info.get_pub_key_func = mes_uc_get_crt_file_path;
dpuc_link_cert_info.kmca_decrypt_func = mes_uc_decode_kmc_pwd;
mes_global_handle()->dpuc_set_security_cert_info(&dpuc_link_cert_info, __FUNCTION__, MY_PID);
uc_config->com_mgr = mes_global_handle()->dpuc_all_init(&commMgrParam, __FUNCTION__);
if (uc_config->com_mgr == NULL) {
OG_LOG_RUN_ERR("Init dpuc failed, pid=%d, lsid=0x%x.", commMgrParam.usPid, commMgrParam.uiServiceId);
return DP_ERROR;
}
OG_LOG_RUN_INF("Init dpuc success, pid=%d, lsid=0x%x.", commMgrParam.usPid, commMgrParam.uiServiceId);
MES_UC_RETURN_IFERR(create_and_reg_eid(uc_config, &msg_recv_func));
MES_UC_RETURN_IFERR(set_subhealth_threshold());
dpuc_datamsg_mem_ops data_ops;
data_ops.pfnReqAllocMsgMem = NULL;
data_ops.pfnRspAllocMsgMem = NULL;
data_ops.pfnFreeMsgMem = NULL;
data_ops.uiSendDataMsgNumReserve = DPUC_DATA_MSG_RESERVE;
data_ops.uiSendDatamsgNumMax = DPUC_DATA_MSG_MAX;
data_ops.uiRecvDataMsgNumReserve = DPUC_DATA_MSG_RESERVE;
data_ops.uiRecvDatamsgNumMax = DPUC_DATA_MSG_MAX;
ret = mes_global_handle()->dpuc_msgmem_reg_integrate(uc_config->eid_obj, NULL, &data_ops, __FUNCTION__);
if (ret != DP_OK) {
OG_LOG_RUN_ERR("Set concurrent info failed.");
return ret;
}
OG_LOG_DEBUG_INF("Set concurrent info success.");
MES_UC_RETURN_IFERR(create_reactor(uc_config));
return DP_OK;
}
static status_t mes_uc_connect_init_addr(dpuc_addr eid_addr[], char *ip, uint16 port, uint32 *eid_num)
{
uint32 ip_cnt = 0;
char ip_addrs[CM_INST_MAX_IP_NUM][CM_MAX_IP_LEN] = { { 0 } };
if (cm_parse_lsnr_addr(ip, (uint32)strlen(ip), &ip_cnt, ip_addrs) != OG_SUCCESS) {
OG_LOG_RUN_ERR("mes uc connectinit addr failed, ip=%s, port=%u, ip_cnt=%u.", ip, port, ip_cnt);
return OG_ERROR;
}
*eid_num = ip_cnt;
for (int i = 0; i < ip_cnt; i++) {
eid_addr[i].AddrFamily = (g_mes.profile.pipe_type == CS_TYPE_UC ? DPUC_ADDR_FAMILY_IPV4
: DPUC_ADDR_FAMILY_IPV4_RDMA);
eid_addr[i].PlaneType = DPUC_DATA_PLANE;
char listen_ip[CM_MAX_IP_LEN] = { 0 };
if (cm_check_ip_valid(ip_addrs[i])) {
OG_RETURN_IFERR(memcpy_s(listen_ip, CM_MAX_IP_LEN, ip_addrs[i], CM_MAX_IP_LEN));
} else {
OG_RETURN_IFERR(cm_domain_to_ip(ip_addrs[i], listen_ip) != OG_SUCCESS);
}
OG_LOG_RUN_INF("listen ip %s", listen_ip);
PRTS_RETURN_IFERR(sprintf_s(eid_addr[i].Url, DPUC_URL_LEN, "%s:%u", listen_ip, port));
}
return OG_SUCCESS;
}
static int32_t mes_uc_server(mes_uc_config_t *uc_config, char *ip, uint16 port)
{
uint32 ip_cnt = 0;
dpuc_addr eid_addr[CM_INST_MAX_IP_NUM];
if (mes_uc_connect_init_addr(eid_addr, ip, port, &ip_cnt) != OG_SUCCESS) {
OG_LOG_RUN_ERR("init client eid addr failed ip(%s) ip_cnt(%u) port(%d).", ip, ip_cnt, port);
return OG_ERROR;
}
int32_t ret = mes_global_handle()->dpuc_set_src_eid_addr(uc_config->eid_obj, eid_addr, ip_cnt, DPUC_ADDR_SERVER,
__FUNCTION__);
if (ret != DP_OK) {
OG_LOG_RUN_ERR("mes set src eid addr failed, eid=0x%lx url=%s ip_cnt=%u.", uc_config->eid, eid_addr[0].Url,
ip_cnt);
return ret;
}
OG_LOG_RUN_INF("mes set src server eid addr success, eid=0x%lx, ip_cnt=%u.", uc_config->eid, ip_cnt);
return OG_SUCCESS;
}
static status_t mes_uc_create_link(uint32 inst_id, dpuc_addr *client_eid_addr, dpuc_addr *server_eid_addr)
{
int32_t ret;
OG_LOG_RUN_INF("Instance %d eid=0x%lx, url=%s to connect instance %d, eid=0x%lx, url=%s.", g_mes.profile.inst_id,
g_mes_uc_config.eid, client_eid_addr->Url, inst_id, g_mes_uc_config.dst_eid[inst_id],
server_eid_addr->Url);
dpuc_conn_params_t con_param = { 0 };
con_param.pri = 0;
con_param.hop = 0;
con_param.time_out = 0;
con_param.runMode = DPUC_PERSISTENT_CONN;
con_param.recovery_pri = DPUC_CONN_RECVOERY_L;
con_param.pSrcAddr = client_eid_addr;
con_param.pDstAddr = server_eid_addr;
con_param.kaInterval = 1;
con_param.kaTimeoutTimes = MES_UC_XNET_TIMEOUT_TIMES;
for (uint32 i = 0; i < g_mes.profile.channel_num; i++) {
ret = mes_global_handle()->dpuc_link_create_with_addr(g_mes_uc_config.eid_obj, g_mes_uc_config.dst_eid[inst_id],
&con_param, __FUNCTION__);
if (ret != DP_OK) {
OG_LOG_RUN_ERR("To intance %d create link failed.ret = %d", inst_id, ret);
return OG_ERROR;
}
}
g_mes_uc_channel_status[inst_id].is_allow_msg_transfer = OG_TRUE;
OG_LOG_RUN_INF("Call create link success.");
return OG_SUCCESS;
}
status_t mes_uc_connect(uint32 inst_id)
{
if (inst_id == g_mes.profile.inst_id || inst_id >= OG_MAX_INSTANCES) {
OG_LOG_RUN_ERR("connect id is invalid %d", inst_id);
return OG_ERROR;
}
uint32 lsid = g_mes.profile.inst_lsid[inst_id];
int32_t ret = mes_global_handle()->dpuc_eid_make(NORMAL_TYPE, MY_PID, 0, lsid, &g_mes_uc_config.dst_eid[inst_id],
__FUNCTION__);
if (ret != DP_OK) {
OG_LOG_RUN_ERR("Set dst eid failed (%d).", ret);
return OG_ERROR;
}
dpuc_addr client_eid_addr[CM_INST_MAX_IP_NUM];
char *lsnr_host = MES_HOST_NAME(g_mes.profile.inst_id);
uint16 port = g_mes.profile.inst_arr[g_mes.profile.inst_id].port;
uint32 client_ip_cnt = 0;
if (mes_uc_connect_init_addr(client_eid_addr, lsnr_host, port, &client_ip_cnt) != OG_SUCCESS) {
OG_LOG_RUN_ERR("init client eid addr failed ip(%s) ip_cnt(%u).", lsnr_host, client_ip_cnt);
return OG_ERROR;
}
dpuc_addr server_eid_addr[CM_INST_MAX_IP_NUM];
lsnr_host = MES_HOST_NAME(inst_id);
port = g_mes.profile.inst_arr[inst_id].port;
uint32 server_ip_cnt = 0;
if (mes_uc_connect_init_addr(server_eid_addr, lsnr_host, port, &server_ip_cnt) != OG_SUCCESS) {
OG_LOG_RUN_ERR("init server eid addr failed ip(%s) ip_cnt(%u).", lsnr_host, server_ip_cnt);
return OG_ERROR;
}
ret = mes_global_handle()->dpuc_set_dst_eid_addr(g_mes_uc_config.com_mgr, g_mes_uc_config.dst_eid[inst_id],
server_eid_addr, server_ip_cnt, __FUNCTION__);
if (ret != DP_OK) {
OG_LOG_RUN_ERR("mes set dst inst_id(%d) eid addr failed=%d, ip_cnt=%u.", inst_id, ret, server_ip_cnt);
return OG_ERROR;
}
if (client_ip_cnt != server_ip_cnt) {
OG_LOG_RUN_WAR("mes uc connect parse ip abnormal, ip cnt info[%u - %u].", client_ip_cnt, server_ip_cnt);
}
for (int i = 0; i < MIN(server_ip_cnt, client_ip_cnt); i++) {
if (mes_uc_create_link(inst_id, &client_eid_addr[i], &server_eid_addr[i]) != OG_SUCCESS) {
OG_LOG_RUN_ERR("mes connect to inst(%u) create link(%u) failed.", inst_id, i);
return OG_ERROR;
}
}
mes_uc_conn_t *conn = &g_mes_uc_channel_status[inst_id];
cm_thread_lock(&conn->lock);
if (conn->uc_channel_state == MES_CHANNEL_CLOSED) {
conn->uc_channel_state = MES_CHANNEL_UNCONNECTED;
}
cm_thread_unlock(&conn->lock);
OG_LOG_RUN_INF("mes uc inst(%u) connect to inst(%u) create (%u:%u) link success.", g_mes.profile.inst_id, inst_id,
server_ip_cnt, client_ip_cnt);
return OG_SUCCESS;
}
void mes_uc_disconnect(uint32 inst_id)
{
if (inst_id >= OG_MAX_INSTANCES) {
OG_LOG_RUN_ERR("inst_id out of range OG_MAX_INSTANCES.");
return;
}
mes_uc_disconnect_async(inst_id);
uint32 lsid = g_mes.profile.inst_lsid[inst_id];
if (cm_wait_cond(&g_uc_io_finish_cond, MES_DISCONNECT_TIMEOUT)) {
OG_LOG_RUN_INF("mes uc disconnect sync dst_lsid 0x%x finish.", lsid);
return;
}
OG_LOG_RUN_WAR("mes uc disconnect sync dst_lsid 0x%x timeout %d ms.", lsid, MES_DISCONNECT_TIMEOUT);
}
void mes_uc_disconnect_async(uint32 inst_id)
{
if (inst_id >= OG_MAX_INSTANCES) {
OG_LOG_RUN_ERR("inst_id out of range OG_MAX_INSTANCES.");
return;
}
g_mes_uc_channel_status[inst_id].is_allow_msg_transfer = OG_FALSE;
uint32 lsid = g_mes.profile.inst_lsid[inst_id];
int32_t ret = DP_ERROR;
ret = mes_global_handle()->dpuc_qlink_close(lsid, DPUC_DESTROY_LINK, DPUC_DATA_PLANE, __FUNCTION__);
if (ret != DP_OK) {
OG_LOG_RUN_ERR("Disconnect dst_lsid 0x%x failed.", lsid);
return;
}
mes_uc_conn_t *conn = &g_mes_uc_channel_status[inst_id];
cm_thread_lock(&conn->lock);
conn->uc_channel_state = MES_CHANNEL_CLOSED;
cm_thread_unlock(&conn->lock);
OG_LOG_RUN_INF("Disconnect dst_lsid 0x%x success.", lsid);
}
bool32 mes_uc_connection_ready(uint32 inst_id)
{
mes_uc_conn_t *conn = &g_mes_uc_channel_status[inst_id];
return ((conn->uc_channel_state != MES_CHANNEL_UNCONNECTED) && (conn->is_allow_msg_transfer == OG_TRUE));
}
mes_channel_stat_t mes_uc_get_channel_state(uint32 inst_id)
{
return g_mes_uc_channel_status[inst_id].uc_channel_state;
}
static status_t mes_uc_lsnr(void)
{
char *lsnr_host = MES_HOST_NAME(g_mes.profile.inst_id);
uint16 port = g_mes.profile.inst_arr[g_mes.profile.inst_id].port;
if (mes_uc_server(&g_mes_uc_config, lsnr_host, port) != DP_OK) {
OG_LOG_RUN_ERR("mes_start_lsnr failed.");
return OG_ERROR;
}
OG_LOG_DEBUG_INF("mes_start_lsnr suceess.");
return OG_SUCCESS;
}
static void mes_init_uc_channel_status(void)
{
uint32 i;
mes_uc_conn_t *conn;
for (i = 0; i < OG_MAX_INSTANCES; i++) {
conn = &g_mes_uc_channel_status[i];
conn->uc_channel_state = MES_CHANNEL_CLOSED;
conn->is_allow_msg_transfer = OG_FALSE;
cm_init_thread_lock(&conn->lock);
}
g_mes_uc_channel_status[g_mes.profile.inst_id].is_allow_msg_transfer = OG_TRUE;
}
static uint64 mes_uc_get_reconn_bitmap()
{
uint64 reconn_bitmap = 0;
for (uint32 i = 0; i < g_mes.profile.inst_count; i++) {
if (i == g_mes.profile.inst_id) {
continue;
}
if (rc_bitmap64_exist(&g_channel_reconn_bits, i)) {
rc_bitmap64_set(&reconn_bitmap, i);
rc_bitmap64_clear(&g_channel_reconn_bits, i);
}
}
OG_LOG_RUN_INF("mes uc should reconnect, reconn_bitmap = %llu", reconn_bitmap);
return reconn_bitmap;
}
static void mes_channel_check_thread(thread_t *thread)
{
while (!thread->closed) {
OG_LOG_RUN_INF("mes channel check is running.");
uint64 reconn_bitmap = mes_uc_get_reconn_bitmap();
if (reconn_bitmap == 0) {
cm_wait_cond_no_timeout(&g_reconn_thread_cond);
OG_LOG_RUN_INF("cm wait cond no timeout success.");
continue;
}
for (uint32 i = 0; i < g_mes.profile.inst_count; ++i) {
if (i == g_mes.profile.inst_id) {
continue;
}
if (!MES_SHOULD_RECONN(reconn_bitmap, i)) {
continue;
}
if (mes_uc_connect(i) != OG_SUCCESS) {
OG_LOG_RUN_ERR("reconnect failed, inst_id = %d", i);
continue;
}
OG_LOG_RUN_INF("reconnect success, inst_id = %d", i);
}
}
}
status_t mes_init_uc(void)
{
uint32 i;
for (i = 0; i < OG_MES_MAX_REACTOR_THREAD_NUM; i++) {
init_msgqueue(&g_mes_uc_recv_thead[i].msg_queue);
cm_init_thread_lock(&g_mes_uc_recv_thead[i].lock);
g_mes_uc_recv_thead[i].thread_ready = OG_TRUE;
}
mes_init_uc_channel_status();
mes_init_mq_local_queue();
OG_RETURN_IFERR(mes_init_message_pool());
if (!g_enable_dbstor) {
char log_path[OG_MAX_PATH_LEN];
PRTS_RETURN_IFERR(sprintf_s(log_path, OG_MAX_PATH_LEN, "%s", "/home/ogracdba"));
if (init_dpuc_log((char *)log_path) != DP_OK) {
OG_LOG_RUN_ERR("uc log initialize failed.");
return OG_ERROR;
}
OG_LOG_RUN_INF("uc log initialize successs.");
}
g_mes_uc_config.lsid = g_mes.profile.inst_lsid[g_mes.profile.inst_id];
OG_LOG_DEBUG_INF("mes uc config lsid = 0x%x", g_mes_uc_config.lsid);
if (init_xnet_dpuc(&g_mes_uc_config) != DP_OK) {
OG_LOG_RUN_ERR("UC config initialize failed.");
return OG_ERROR;
}
OG_LOG_DEBUG_INF("UC config initialize success.");
OG_RETURN_IFERR(mes_uc_lsnr());
cm_init_cond(&g_reconn_thread_cond);
cm_init_cond(&g_uc_io_finish_cond);
g_channel_reconn_bits = 0;
if (cm_create_thread(mes_channel_check_thread, 0, NULL, &g_mes_channel_check_thread) != OG_SUCCESS) {
OG_LOG_RUN_ERR("mes create channel check thread failed.");
return OG_ERROR;
}
return OG_SUCCESS;
}