* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* mes_func.c
*
*
* IDENTIFICATION
* src/mec/mes_func.c
*
* -------------------------------------------------------------------------
*/
#include "mes_log_module.h"
#include "cm_ip.h"
#include "cm_memory.h"
#include "cm_spinlock.h"
#include "cm_sync.h"
#include "cs_tcp.h"
#include "dtc_trace.h"
#include "rc_reform.h"
#include "mes_uc.h"
#include "mes_tcp.h"
#include "mes_func.h"
#include "knl_database.h"
mes_instance_t g_mes;
mes_stat_t g_mes_stat[MES_CMD_CEIL];
char g_mes_cpu_info[OG_MES_MAX_CPU_STR];
mes_elapsed_stat_t g_mes_elapsed_stat;
message_timeout_check_func g_message_need_timeout_early = NULL;
uint64 g_start_time;
int64 g_get_count = 0;
int64 g_release_count = 0;
mes_connect_t g_connect_func;
mes_disconnect_t g_disconnect_func;
mes_async_disconnect_t g_disconnect_async_func;
mes_send_data_t g_send_func;
mes_send_data_t g_cms_send_func;
mes_send_bufflist_t g_send_bufflist_func;
mes_release_buf_t g_release_buf_func;
mes_connection_ready_t g_conn_ready_func;
mes_alloc_msgitem_t g_alloc_msgitem_func;
bool32 g_enable_dbstor = OG_FALSE;
ssl_auth_file_t g_mes_ssl_auth_file = {0};
#define MES_CONNECT(inst_id) g_connect_func(inst_id)
#define MES_DISCONNECT(inst_id) g_disconnect_func(inst_id)
#define MES_DISCONNECT_ASYNC(inst_id) g_disconnect_async_func(inst_id)
#define MES_SEND_DATA(data) g_send_func(data)
#define MES_CMS_SEND_DATA(data) g_cms_send_func(data)
#define MES_SEND_BUFFLIST(buff_list) g_send_bufflist_func(buff_list)
#define MES_RELEASE_BUFFER(buffer) g_release_buf_func(buffer)
#define MES_CONNETION_READY(inst_id) g_conn_ready_func(inst_id)
#define MES_ALLOC_MSGITEM(queue) g_alloc_msgitem_func(queue)
#define MES_FORMAT_LOG_LENGTH (1024)
#define MES_GET_BUF_TRY_TIMES (50)
#define MES_SESSION_TO_CHANNEL_ID(sid) (uint8)((sid) % g_mes.profile.channel_num)
mes_instance_t *get_g_mes(void)
{
return &g_mes;
}
mes_stat_t *get_g_mes_stat(void)
{
return g_mes_stat;
}
char *get_g_mes_cpu_info(void)
{
return g_mes_cpu_info;
}
static bool32 mes_message_need_timeout(void)
{
if (g_message_need_timeout_early == NULL) {
return OG_FALSE;
}
return g_message_need_timeout_early();
}
void mes_set_message_timeout_check_func(message_timeout_check_func func)
{
OG_LOG_DEBUG_INF("[mes] set message timeout check function successful.");
g_message_need_timeout_early = func;
}
static inline status_t mes_log_format_buf(char *buf, const char *format, ...)
{
va_list args;
va_start(args, format);
PRTS_RETURN_IFERR(snprintf_s(buf, MES_FORMAT_LOG_LENGTH, MES_FORMAT_LOG_LENGTH - 1, format, args));
va_end(args);
return OG_SUCCESS;
}
#define MES_LOG_WAR_HEAD_EXX(head, format, ...) \
do { \
char buf[MES_FORMAT_LOG_LENGTH]; \
mes_log_format_buf(buf, format, ##__VA_ARGS__); \
OG_LOG_RUN_WAR("[mes] %s: %s. cmd=%u, rsn=%u, src_inst=%u, dst_inst=%u, src_sid=%u, dst_sid=%u.", \
(char *)__func__, buf, head->cmd, head->rsn, head->src_inst, head->dst_inst, head->src_sid, \
head->dst_sid); \
} while (0)
#define MES_STAT_SEND(head) \
do { \
mes_send_stat((head)->cmd); \
MES_LOG_HEAD(head); \
} while (0)
#define MES_STAT_SEND_FAIL(head) \
do { \
mes_send_fail_stat((head)->cmd); \
MES_LOG_HEAD_FAIL(head); \
} while (0)
static void mes_consume_time_init(void)
{
for (int j = 0; j < MES_CMD_CEIL; j++) {
g_mes_elapsed_stat.time_consume_stat[j].cmd = j;
g_mes_elapsed_stat.time_consume_stat[j].group_id = 0;
for (int i = 0; i < MES_TIME_CEIL; i++) {
g_mes_elapsed_stat.time_consume_stat[j].time[i] = 0;
g_mes_elapsed_stat.time_consume_stat[j].count[i] = 0;
g_mes_elapsed_stat.time_consume_stat[j].non_empty = OG_FALSE;
}
}
return;
}
void mes_init_stat(void)
{
for (int i = 0; i < MES_CMD_CEIL; i++) {
g_mes_stat[i].cmd = i;
g_mes_stat[i].send_count = 0;
g_mes_stat[i].send_fail_count = 0;
g_mes_stat[i].recv_count = 0;
g_mes_stat[i].local_count = 0;
g_mes_stat[i].dealing_count = 0;
g_mes_stat[i].non_empty = OG_FALSE;
}
for (int i = 0; i < MES_LOGGING_CEIL; i++) {
g_mes.mes_ctx.logging_time[i] = 0;
}
mes_consume_time_init();
return;
}
static inline void mes_send_stat(mes_command_t cmd)
{
cm_atomic_inc(&(g_mes_stat[cmd].send_count));
}
static inline void mes_send_fail_stat(mes_command_t cmd)
{
cm_atomic_inc(&(g_mes_stat[cmd].send_fail_count));
}
static inline void mes_local_stat(mes_command_t cmd)
{
cm_atomic_inc(&(g_mes_stat[cmd].local_count));
}
static inline void mes_add_dealing_count(mes_command_t cmd)
{
cm_atomic32_inc(&(g_mes_stat[cmd].dealing_count));
}
void mes_dec_dealing_count(mes_command_t cmd)
{
if (cmd >= MES_CMD_CEIL) {
OG_LOG_RUN_ERR("index out of MES_CMD_CEI!");
return;
}
cm_atomic32_dec(&(g_mes_stat[cmd].dealing_count));
}
void mes_release_message_buf(const char *msg_buf)
{
CM_POINTER(msg_buf);
mes_message_head_t *head = (mes_message_head_t *)msg_buf;
if (head->cmd < MES_CMD_CEIL) {
mes_release_buf_stat(msg_buf);
mes_dec_dealing_count(head->cmd);
}
mes_free_buf_item((char *)msg_buf);
return;
}
void mes_get_message_buf(mes_message_t *msg, mes_message_head_t *head)
{
char *msg_buf;
uint64 stat_time = 0;
mes_get_consume_time_start(&stat_time);
msg_buf = mes_alloc_buf_item(head->size);
MES_MESSAGE_ATTACH(msg, msg_buf);
mes_consume_with_time(head->cmd, MES_TIME_GET_BUF, stat_time);
return;
}
static inline void mes_recv_message_stat(mes_message_t *msg)
{
cm_atomic_inc(&(g_mes_stat[msg->head->cmd].recv_count));
MES_LOG_WITH_MSG(msg);
}
void mes_process_message(dtc_msgqueue_t *my_queue, uint32 recv_idx, mes_message_t *msg, uint64 start_time)
{
dtc_msgitem_t *msgitem;
mes_recv_message_stat(msg);
mes_add_dealing_count(msg->head->cmd);
if (g_mes.is_enqueue[msg->head->cmd]) {
msgitem = MES_ALLOC_MSGITEM(my_queue);
if (msgitem == NULL) {
mes_release_message_buf(msg->buffer);
OG_LOG_RUN_ERR("[mes]: alloc msgitem failed.");
return;
}
msgitem->msg.head = msg->head;
msgitem->msg.buffer = msg->buffer;
msgitem->start_time = start_time;
mes_put_msgitem(msgitem);
mes_consume_with_time(msg->head->cmd, MES_TIME_PUT_QUEUE, start_time);
return;
}
g_mes.proc((g_mes.profile.work_thread_num + recv_idx), msg);
mes_consume_with_time(msg->head->cmd, MES_TIME_PROC_FUN, start_time);
return;
}
static status_t mes_set_addr(uint32 inst_id, char *ip, uint16 port)
{
errno_t ret;
if (inst_id >= OG_MAX_INSTANCES) {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "node_id %u is invalid.", inst_id);
return OG_ERROR;
}
ret = strncpy_s(g_mes.profile.inst_arr[inst_id].ip, OG_MAX_INST_IP_LEN, ip, strlen(ip));
if (ret != EOK) {
OG_THROW_ERROR(ERR_SYSTEM_CALL, (ret));
return OG_ERROR;
}
g_mes.profile.inst_arr[inst_id].port = port;
return OG_SUCCESS;
}
static void mes_clear_addr(uint32 node_id)
{
if (node_id >= OG_MAX_INSTANCES) {
return;
}
g_mes.profile.inst_arr[node_id].ip[0] = '\0';
g_mes.profile.inst_arr[node_id].port = 0;
return;
}
status_t mes_connect(uint32 inst_id, char *ip, uint16 port)
{
mes_conn_t *conn;
if ((inst_id == g_mes.profile.inst_id) || (inst_id >= OG_MAX_INSTANCES)) {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "[mes]: connect inst_id %u failed, current inst_id %u.", inst_id,
g_mes.profile.inst_id);
return OG_ERROR;
}
conn = &g_mes.mes_ctx.conn_arr[inst_id];
cm_thread_lock(&conn->lock);
if (g_mes.mes_ctx.conn_arr[inst_id].is_connect) {
cm_thread_unlock(&conn->lock);
OG_THROW_ERROR_EX(ERR_MES_ALREADY_CONNECT, "[mes]: dst instance %u.", inst_id);
return OG_ERROR;
}
if (mes_set_addr(inst_id, ip, port) != OG_SUCCESS) {
cm_thread_unlock(&conn->lock);
OG_LOG_RUN_ERR("[mes]: mes_set_addr failed.");
return OG_ERROR;
}
if (MES_CONNECT(inst_id) != OG_SUCCESS) {
cm_thread_unlock(&conn->lock);
OG_LOG_RUN_ERR("[mes]: MES_CONNECT failed.");
return OG_ERROR;
}
g_mes.mes_ctx.conn_arr[inst_id].is_connect = OG_TRUE;
cm_thread_unlock(&conn->lock);
OG_LOG_RUN_INF("[mes] connect to instance %u, %s:%u.", inst_id, ip, port);
return OG_SUCCESS;
}
bool32 mes_connection_ready(uint32 inst_id)
{
if ((inst_id >= OG_MAX_INSTANCES) || (inst_id == g_mes.profile.inst_id)) {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "[mes] connect inst_id %u is invalid, current inst_id %u.", inst_id,
g_mes.profile.inst_id);
return OG_FALSE;
}
return MES_CONNETION_READY(inst_id);
}
static inline void mes_wakeup(uint16 sid)
{
if (sid >= OG_MAX_MES_ROOMS) {
return;
}
mes_waiting_room_t *room = &g_mes.mes_ctx.waiting_rooms[sid];
cm_spin_lock(&room->lock, NULL);
(void)cm_atomic_set(&room->timeout, 0);
cm_spin_unlock(&room->lock);
}
void mes_wakeup_rooms(void)
{
OG_LOG_RUN_WAR("[mes] start wakeup all rooms.");
for (uint32 i = 0; i < OG_MAX_MES_ROOMS; i++) {
mes_wakeup(i);
}
OG_LOG_RUN_WAR("[mes] finish wakeup all rooms.");
}
status_t mes_reconnect(uint32 inst_id)
{
mes_conn_t *conn;
if (inst_id >= OG_MAX_INSTANCES || inst_id == g_mes.profile.inst_id) {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "[mes]: mes_reconnect: inst_id %u is illegal.", inst_id);
return OG_ERROR;
}
bool32 need_disconnect = OG_TRUE;
conn = &g_mes.mes_ctx.conn_arr[inst_id];
if (!g_mes.mes_ctx.conn_arr[inst_id].is_connect) {
cm_thread_unlock(&conn->lock);
OG_LOG_RUN_INF("[mes] target node %u is not connect, no need disconnect.", inst_id);
need_disconnect = OG_FALSE;
}
if (need_disconnect) {
MES_DISCONNECT(inst_id);
}
OG_LOG_RUN_INF("[mes] mes reconnect to inst_id(%u), ip_addrs[%s], port[%u].",
inst_id, g_mes.profile.inst_arr[inst_id].ip, g_mes.profile.inst_arr[inst_id].port);
if (MES_CONNECT(inst_id) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[mes] conncect to instance %u failed.", inst_id);
}
g_mes.mes_ctx.conn_arr[inst_id].is_connect = OG_TRUE;
OG_LOG_RUN_INF("[mes] success reconnect node %u.", inst_id);
return OG_SUCCESS;
}
status_t mes_disconnect(uint32 inst_id, bool32 isSync)
{
mes_conn_t *conn;
if (inst_id >= OG_MAX_INSTANCES || inst_id == g_mes.profile.inst_id) {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "[mes]: mes_disconnect: inst_id %u is illegal.", inst_id);
return OG_ERROR;
}
conn = &g_mes.mes_ctx.conn_arr[inst_id];
cm_thread_lock(&conn->lock);
if (!g_mes.mes_ctx.conn_arr[inst_id].is_connect) {
cm_thread_unlock(&conn->lock);
OG_LOG_RUN_INF("[mes] target node %u is not connect, no need disconnect.", inst_id);
return OG_SUCCESS;
}
if (isSync) {
MES_DISCONNECT(inst_id);
} else {
MES_DISCONNECT_ASYNC(inst_id);
}
mes_clear_addr(inst_id);
g_mes.mes_ctx.conn_arr[inst_id].is_connect = OG_FALSE;
cm_thread_unlock(&conn->lock);
OG_LOG_RUN_INF("[mes] success disconnect node %u.", inst_id);
return OG_SUCCESS;
}
static status_t mes_connect_by_profile(void)
{
uint32 i;
if (!g_mes.profile.conn_by_profile) {
return OG_SUCCESS;
}
for (i = 0; i < g_mes.profile.inst_count; i++) {
if (i == g_mes.profile.inst_id) {
continue;
}
if (mes_connect(i, g_mes.profile.inst_arr[i].ip, g_mes.profile.inst_arr[i].port) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[mes] conncect to instance %d failed.", i);
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static void mes_disconnect_by_profile(void)
{
uint32 i;
for (i = 0; i < g_mes.profile.inst_count; i++) {
if ((i != g_mes.profile.inst_id) && g_mes.mes_ctx.conn_arr[i].is_connect) {
mes_disconnect(i, OG_TRUE);
}
}
return;
}
static void mes_clean_session_mutex(uint32 ceil)
{
uint32 i;
for (i = 0; i < ceil; i++) {
mes_mutex_destroy(&g_mes.mes_ctx.waiting_rooms[i].mutex);
mes_mutex_destroy(&g_mes.mes_ctx.waiting_rooms[i].broadcast_mutex);
}
}
static status_t mes_init_session_room()
{
uint32 i;
mes_waiting_room_t *room = NULL;
for (i = 0; i < OG_MAX_MES_ROOMS; i++) {
room = &g_mes.mes_ctx.waiting_rooms[i];
if (mes_mutex_create(&room->mutex) != OG_SUCCESS) {
mes_clean_session_mutex(i);
OG_LOG_RUN_ERR("mes_mutex_create %u failed.", i);
return OG_ERROR;
}
if (mes_mutex_create(&room->broadcast_mutex) != OG_SUCCESS) {
mes_clean_session_mutex(i);
OG_LOG_RUN_ERR("mes_mutex_create %u failed.", i);
return OG_ERROR;
}
OG_INIT_SPIN_LOCK(room->lock);
room->rsn = cm_random(OG_INVALID_ID32);
room->check_rsn = room->rsn;
(void)cm_atomic_set(&room->timeout, 0);
}
return OG_SUCCESS;
}
static void mes_init_conn(void)
{
uint32 i;
mes_conn_t *conn;
for (i = 0; i < OG_MAX_INSTANCES; i++) {
conn = &g_mes.mes_ctx.conn_arr[i];
conn->is_connect = OG_FALSE;
cm_init_thread_lock(&conn->lock);
}
return;
}
static status_t mes_register_func(void)
{
if (g_mes.profile.pipe_type == CS_TYPE_TCP) {
g_connect_func = mes_tcp_connect;
g_disconnect_func = mes_tcp_disconnect;
g_disconnect_async_func = mes_tcp_disconnect_async;
g_send_func = mes_tcp_send_data;
g_cms_send_func = mes_cms_tcp_send_data;
g_send_bufflist_func = mes_tcp_send_bufflist;
if (!g_mes.profile.use_ssl) {
g_conn_ready_func = mes_tcp_connection_ready;
} else {
g_conn_ready_func = mes_ssl_connection_ready;
}
g_alloc_msgitem_func = mes_alloc_msgitem_nolock;
} else if (g_mes.profile.pipe_type == CS_TYPE_UC || g_mes.profile.pipe_type == CS_TYPE_UC_RDMA) {
g_cms_send_func = mes_uc_send_data;
g_connect_func = mes_uc_connect;
g_disconnect_func = mes_uc_disconnect;
g_disconnect_async_func = mes_uc_disconnect_async;
g_send_func = mes_uc_send_data;
g_send_bufflist_func = mes_uc_send_bufflist;
g_conn_ready_func = mes_uc_connection_ready;
g_alloc_msgitem_func = mes_alloc_msgitem_nolock;
} else {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "pipe_type %u is invalid", g_mes.profile.pipe_type);
return OG_ERROR;
}
mes_init_send_recv_buf_pool();
return OG_SUCCESS;
}
status_t mes_init_pipe(void)
{
if (g_mes.profile.pipe_type == CS_TYPE_TCP) {
if (mes_init_tcp() != OG_SUCCESS) {
OG_LOG_RUN_ERR("mes_init_tcp failed.");
return OG_ERROR;
}
} else if (g_mes.profile.pipe_type == CS_TYPE_UC || g_mes.profile.pipe_type == CS_TYPE_UC_RDMA) {
if (mes_init_uc() != OG_SUCCESS) {
OG_LOG_RUN_ERR("mes_init_uc failed.");
return OG_ERROR;
}
} else {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "pipe_type %u is invalid", g_mes.profile.pipe_type);
return OG_ERROR;
}
return OG_SUCCESS;
}
static void mes_destroy_pipe(void)
{
if (g_mes.profile.pipe_type == CS_TYPE_TCP) {
mes_destroy_tcp();
} else if (g_mes.profile.pipe_type == CS_TYPE_UC || g_mes.profile.pipe_type == CS_TYPE_UC_RDMA) {
mes_destroy_uc();
} else {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "pipe_type %u is invalid", g_mes.profile.pipe_type);
}
return;
}
static status_t mes_init_resource(void)
{
mes_init_stat();
mes_init_conn();
if (mes_init_session_room() != OG_SUCCESS) {
OG_LOG_RUN_ERR("mes_init_session_room failed.");
return OG_ERROR;
}
if (init_dtc_mq_instance() != OG_SUCCESS) {
OG_LOG_RUN_ERR("init_dtc_mq_instance failed.");
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t mes_init(void)
{
if (mes_register_func() != OG_SUCCESS) {
OG_LOG_RUN_ERR("[mes]: mes_register_func failed.");
return OG_ERROR;
}
if (mes_init_resource() != OG_SUCCESS) {
OG_LOG_RUN_ERR("[mes]: init resource failed.");
return OG_ERROR;
}
if (mes_init_pipe() != OG_SUCCESS) {
OG_LOG_RUN_ERR("[mes]: mes_init_pipe failed.");
return OG_ERROR;
}
if (mes_connect_by_profile() != OG_SUCCESS) {
OG_LOG_RUN_ERR("[mes]: mes_connect_by_profile failed.");
return OG_ERROR;
}
OG_LOG_RUN_INF("[mes] mes_init success.");
return OG_SUCCESS;
}
void mes_clean(void)
{
free_dtc_mq_instance();
mes_disconnect_by_profile();
mes_clean_session_mutex(OG_MAX_MES_ROOMS);
mes_destroy_pipe();
OG_LOG_RUN_INF("[mes] mes_clean.");
return;
}
status_t mes_set_process_lsid(mes_profile_t *profile)
{
if (profile->inst_count >= OG_MAX_INSTANCES) {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "instinst_count_id %u is invalid, exceed max instance num %u.", profile->inst_count,
OG_MAX_INSTANCES);
return OG_ERROR;
}
for (int i = 0; i < profile->inst_count; i++) {
g_mes.profile.inst_lsid[i] = profile->inst_lsid[i];
}
return OG_SUCCESS;
}
status_t mes_set_instance_info(uint32 inst_id, uint32 inst_count, mes_addr_t *inst_arr)
{
errno_t ret;
if (inst_id >= OG_MAX_INSTANCES) {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "inst_id %u is invalid, exceed max instance num %u.", inst_id,
OG_MAX_INSTANCES);
return OG_ERROR;
}
if (inst_count >= OG_MAX_INSTANCES) {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "instinst_count_id %u is invalid, exceed max instance num %u.", inst_count,
OG_MAX_INSTANCES);
return OG_ERROR;
}
g_mes.profile.inst_id = inst_id;
g_mes.profile.inst_count = inst_count;
ret = memset_sp(g_mes.profile.inst_arr, (sizeof(mes_addr_t) * OG_MAX_INSTANCES), 0,
(sizeof(mes_addr_t) * OG_MAX_INSTANCES));
MEMS_RETURN_IFERR(ret);
for (int i = 0; i < inst_count; i++) {
if (mes_set_addr(i, inst_arr[i].ip, inst_arr[i].port) != OG_SUCCESS) {
OG_LOG_RUN_ERR("mes_set_addr node(%d) ip(%s) port(%u) failed.", i, inst_arr[i].ip, inst_arr[i].port);
return OG_ERROR;
}
}
return OG_SUCCESS;
}
void mes_set_pool_size(uint32 mes_pool_size)
{
if (mes_pool_size < OG_MES_MIN_POOL_SIZE) {
g_mes.profile.pool_size = OG_MES_MIN_POOL_SIZE;
OG_LOG_RUN_WAR("[mes] min pool size is %u.", OG_MES_MIN_POOL_SIZE);
} else if (mes_pool_size > OG_MES_MAX_POOL_SIZE) {
g_mes.profile.pool_size = OG_MES_MAX_POOL_SIZE;
OG_LOG_RUN_WAR("[mes] max pool size is %u.", OG_MES_MAX_POOL_SIZE);
} else {
g_mes.profile.pool_size = mes_pool_size;
}
OG_LOG_DEBUG_INF("[mes] set pool size %u.", g_mes.profile.pool_size);
return;
}
void mes_set_channel_num(uint32 channel_num)
{
if (channel_num < OG_MES_MIN_CHANNEL_NUM) {
g_mes.profile.channel_num = OG_MES_MIN_CHANNEL_NUM;
OG_LOG_RUN_WAR("[mes] min channel num is %u.", OG_MES_MIN_CHANNEL_NUM);
} else if (channel_num > OG_MES_MAX_CHANNEL_NUM) {
g_mes.profile.channel_num = OG_MES_MAX_CHANNEL_NUM;
OG_LOG_RUN_WAR("[mes] max channel num is %u.", OG_MES_MAX_CHANNEL_NUM);
} else {
g_mes.profile.channel_num = channel_num;
}
OG_LOG_DEBUG_INF("[mes] set channel num %u.", g_mes.profile.channel_num);
return;
}
static void mes_set_reactor_thread_num(uint32 reactor_thread_num)
{
g_mes.profile.reactor_thread_num = reactor_thread_num;
OG_LOG_DEBUG_INF("[mes] set reactor thread num %u.", g_mes.profile.reactor_thread_num);
}
void mes_set_work_thread_num(uint32 thread_num)
{
if (thread_num < MES_MIN_TASK_NUM) {
g_mes.profile.work_thread_num = MES_MIN_TASK_NUM;
OG_LOG_RUN_WAR("[mes] min work thread num is %u.", MES_MIN_TASK_NUM);
} else if (thread_num > MES_MAX_TASK_NUM) {
g_mes.profile.work_thread_num = MES_MAX_TASK_NUM;
OG_LOG_RUN_WAR("[mes] max work thread num is %u.", MES_MAX_TASK_NUM);
} else {
g_mes.profile.work_thread_num = thread_num;
}
OG_LOG_DEBUG_INF("[mes] set work thread num %u.", g_mes.profile.work_thread_num);
return;
}
static status_t mes_set_buffer_pool(mes_profile_t *profile)
{
uint32 pool_count = profile->buffer_pool_attr.pool_count;
if ((pool_count == 0) || (pool_count > MES_MAX_BUFFER_STEP_NUM)) {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "[mes] pool_count %u is invalid, legal scope is [1, %u].", pool_count,
MES_MAX_BUFFER_STEP_NUM);
return OG_ERROR;
}
g_mes.profile.buffer_pool_attr.pool_count = pool_count;
for (uint32 i = 0; i < pool_count; i++) {
g_mes.profile.buffer_pool_attr.buf_attr[i] = profile->buffer_pool_attr.buf_attr[i];
}
return OG_SUCCESS;
}
status_t mes_set_profile(mes_profile_t *profile)
{
CM_POINTER(profile);
if (mes_set_instance_info(profile->inst_id, profile->inst_count, profile->inst_arr) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[mes]: mes_set_instance_info failed.");
return OG_ERROR;
}
if (mes_set_buffer_pool(profile) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[mes]: set buffer pool failed.");
return OG_ERROR;
}
if (mes_set_process_lsid(profile) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[mes]: set process lsid failed.");
return OG_ERROR;
}
g_mes.profile.pipe_type = profile->pipe_type;
mes_set_pool_size(profile->pool_size);
mes_set_channel_num(profile->channel_num);
mes_set_reactor_thread_num(profile->reactor_thread_num);
mes_set_work_thread_num(profile->work_thread_num);
g_mes.profile.conn_by_profile = profile->conn_by_profile;
g_mes.profile.channel_version = profile->channel_version;
g_mes.profile.upgrade_time_ms = profile->upgrade_time_ms;
g_mes.profile.degrade_time_ms = profile->degrade_time_ms;
g_mes.profile.set_cpu_affinity = profile->set_cpu_affinity;
g_mes.mq_ctx.task_num = g_mes.profile.work_thread_num;
g_mes.mq_ctx.group.assign_task_idx = 0;
g_mes.profile.is_init = OG_TRUE;
OG_LOG_RUN_INF("[mes] set profile finish.");
return OG_SUCCESS;
}
status_t mes_set_uc_dpumm_config_path(const char *home_path)
{
if (home_path == NULL) {
OG_LOG_RUN_ERR("[mes]: dpumm config path is null.");
return OG_ERROR;
}
int32 ret = sprintf_s(g_mes.profile.dpumm_config_path, sizeof(g_mes.profile.dpumm_config_path),
"%s/dbstor/conf/infra", home_path);
if (ret == -1) {
OG_LOG_RUN_ERR("[mes]: set dpumm config path failed.");
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t mes_startup(void)
{
if (mes_init() != OG_SUCCESS) {
mes_clean();
OG_LOG_RUN_ERR("mes_init failed.");
return OG_ERROR;
}
return OG_SUCCESS;
}
static inline void mes_set_bufflist(mes_bufflist_t *buff_list, uint32 idx, const void *buff, uint32 len)
{
buff_list->buffers[idx].buf = (char *)buff;
buff_list->buffers[idx].len = len;
}
static inline void mes_append_bufflist(mes_bufflist_t *buff_list, const void *buff, uint32 len)
{
buff_list->buffers[buff_list->cnt].buf = (char *)buff;
buff_list->buffers[buff_list->cnt].len = len;
buff_list->cnt = buff_list->cnt + 1;
}
status_t mes_check_msg_head(mes_message_head_t *head)
{
if (SECUREC_UNLIKELY(head->cmd >= MES_CMD_CEIL)) {
OG_THROW_ERROR_EX(ERR_MES_INVALID_CMD, "[mes][%s] cmd %u is illegal.", (char *)__func__, head->cmd);
MES_LOG_ERR_HEAD_EX(head, "cmd is illegal");
return OG_ERROR;
}
if (SECUREC_UNLIKELY(head->size > MES_512K_MESSAGE_BUFFER_SIZE)) {
OG_THROW_ERROR_EX(ERR_MES_ILEGAL_MESSAGE, "[mes][%s] size %u is excced.", (char *)__func__, head->size);
MES_LOG_ERR_HEAD_EX(head, "message length excced");
return OG_ERROR;
}
if (SECUREC_UNLIKELY(head->size < sizeof(mes_message_head_t))) {
OG_THROW_ERROR_EX(ERR_MES_ILEGAL_MESSAGE, "[mes][%s] size %u is too small.", (char *)__func__, head->size);
MES_LOG_ERR_HEAD_EX(head, "message length too small");
return OG_ERROR;
}
if (SECUREC_UNLIKELY(head->src_inst >= g_mes.profile.inst_count)) {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "[mes][%s] src_inst %u is illegal.", (char *)__func__, head->src_inst);
MES_LOG_ERR_HEAD_EX(head, "src inst id is illegal");
return OG_ERROR;
}
if (SECUREC_UNLIKELY(head->dst_inst >= g_mes.profile.inst_count)) {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "[mes][%s] dst_inst %u is illegal.", (char *)__func__, head->dst_inst);
MES_LOG_ERR_HEAD_EX(head, "dst inst id is illegal");
return OG_ERROR;
}
if (SECUREC_UNLIKELY((head->src_sid > OG_MAX_MES_ROOMS) && (head->src_sid != OG_INVALID_ID16))) {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "[mes][%s] src_sid %u is illegal.", (char *)__func__, head->src_sid);
MES_LOG_ERR_HEAD_EX(head, "src session id is illegal");
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t mes_send_inter_buffer_list(mes_bufflist_t *buff_list)
{
errno_t err;
mes_message_t msg;
char *buffer;
uint32 pos = 0;
buffer = mes_alloc_buf_item(MES_MESSAGE_BUFFER_SIZE);
if (buffer == NULL) {
OG_LOG_RUN_ERR("[mes] mes_alloc_buf_item failed.");
return OG_ERROR;
}
for (int i = 0; i < buff_list->cnt; i++) {
err = memcpy_sp(buffer + pos, MES_MESSAGE_BUFFER_SIZE - pos, buff_list->buffers[i].buf,
buff_list->buffers[i].len);
if (err != EOK) {
mes_release_recv_buf(buffer);
OG_THROW_ERROR(ERR_SYSTEM_CALL, err);
return OG_ERROR;
}
pos += buff_list->buffers[i].len;
}
MES_MESSAGE_ATTACH(&msg, buffer);
if (mes_put_inter_msg(&msg) != OG_SUCCESS) {
mes_release_recv_buf(buffer);
MES_STAT_SEND_FAIL(msg.head);
OG_LOG_RUN_ERR("[mes] mes_put_inter_msg failed.");
return OG_ERROR;
}
mes_local_stat(msg.head->cmd);
mes_add_dealing_count(msg.head->cmd);
return OG_SUCCESS;
}
static status_t mes_send_inter_msg(const void *msg_data)
{
errno_t err;
mes_message_t msg;
char *buffer;
buffer = mes_alloc_buf_item(MES_MESSAGE_BUFFER_SIZE);
if (buffer == NULL) {
OG_LOG_RUN_ERR("buffer allocation failed.");
return OG_ERROR;
}
err = memcpy_sp(buffer, MES_MESSAGE_BUFFER_SIZE, msg_data, ((mes_message_head_t *)msg_data)->size);
if (err != EOK) {
mes_release_recv_buf(buffer);
OG_THROW_ERROR(ERR_SYSTEM_CALL, err);
return OG_ERROR;
}
MES_MESSAGE_ATTACH(&msg, buffer);
if (mes_put_inter_msg(&msg) != OG_SUCCESS) {
mes_release_recv_buf(buffer);
MES_STAT_SEND_FAIL(msg.head);
OG_LOG_RUN_ERR("[mes] mes_put_inter_msg failed.");
return OG_ERROR;
}
mes_local_stat(msg.head->cmd);
mes_add_dealing_count(msg.head->cmd);
return OG_SUCCESS;
}
status_t mes_cms_send_data(const void *msg_data)
{
uint64 start_stat_time = 0;
mes_message_head_t *head = (mes_message_head_t *)msg_data;
if (mes_check_msg_head(head) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[mes] mes check msg head failed.");
return OG_ERROR;
}
head->extend_size = 0;
if (g_mes.crc_check_switch) {
char *body = (char *)msg_data + sizeof(mes_message_head_t);
head->body_cks = mes_calc_cks(body, head->size - sizeof(mes_message_head_t));
head->head_cks = mes_calc_cks((char *)head + sizeof(head->head_cks), sizeof(mes_message_head_t) -
sizeof(head->head_cks));
}
if (head->dst_inst == g_mes.profile.inst_id) {
return mes_send_inter_msg(msg_data);
}
mes_get_consume_time_start(&start_stat_time);
MES_STAT_SEND(head);
if (MES_CMS_SEND_DATA(msg_data) != OG_SUCCESS) {
MES_LOGGING(MES_LOGGING_SEND, "send data from %u to %u failed, cmd=%u, rsn=%u.",
head->src_inst, head->dst_inst, head->cmd, head->rsn);
MES_STAT_SEND_FAIL(head);
return OG_ERROR;
}
mes_consume_with_time(head->cmd, MES_TIME_TEST_SEND, start_stat_time);
return OG_SUCCESS;
}
status_t mes_send_data(const void *msg_data)
{
uint64 start_stat_time = 0;
mes_message_head_t *head = (mes_message_head_t *)msg_data;
if (mes_check_msg_head(head) != OG_SUCCESS) {
return OG_ERROR;
}
head->extend_size = 0;
if (g_mes.crc_check_switch) {
char *body = (char *)msg_data + sizeof(mes_message_head_t);
head->body_cks = mes_calc_cks(body, head->size - sizeof(mes_message_head_t));
head->head_cks = mes_calc_cks((char *)head + sizeof(head->head_cks), sizeof(mes_message_head_t) -
sizeof(head->head_cks));
}
if (head->dst_inst == g_mes.profile.inst_id) {
return mes_send_inter_msg(msg_data);
}
mes_get_consume_time_start(&start_stat_time);
MES_STAT_SEND(head);
if (MES_SEND_DATA(msg_data) != OG_SUCCESS) {
MES_LOGGING(MES_LOGGING_SEND, "send data from %u to %u failed, cmd=%u, rsn=%u.",
head->src_inst, head->dst_inst, head->cmd, head->rsn);
MES_STAT_SEND_FAIL(head);
return OG_ERROR;
}
mes_consume_with_time(head->cmd, MES_TIME_TEST_SEND, start_stat_time);
return OG_SUCCESS;
}
status_t mes_send_data2(mes_message_head_t *head, const void *body)
{
uint64 start_stat_time = 0;
mes_bufflist_t buff_list;
if (mes_check_msg_head(head) != OG_SUCCESS) {
return OG_ERROR;
}
head->extend_size = 0;
if (g_mes.crc_check_switch) {
head->body_cks = mes_calc_cks((char *)body, head->size - sizeof(mes_message_head_t));
head->head_cks = mes_calc_cks((char *)head + sizeof(head->head_cks), sizeof(mes_message_head_t) -
sizeof(head->head_cks));
}
buff_list.cnt = 0;
mes_append_bufflist(&buff_list, head, sizeof(mes_message_head_t));
mes_append_bufflist(&buff_list, body, head->size - sizeof(mes_message_head_t));
if (head->dst_inst == g_mes.profile.inst_id) {
return mes_send_inter_buffer_list(&buff_list);
}
mes_get_consume_time_start(&start_stat_time);
MES_STAT_SEND(head);
if (MES_SEND_BUFFLIST(&buff_list) != OG_SUCCESS) {
MES_LOGGING(MES_LOGGING_SEND, "send data from %u to %u failed, cmd=%u, rsn=%u.",
head->src_inst, head->dst_inst, head->cmd, head->rsn);
MES_STAT_SEND_FAIL(head);
return OG_ERROR;
}
mes_consume_with_time(head->cmd, MES_TIME_TEST_SEND, start_stat_time);
return OG_SUCCESS;
}
status_t mes_send_data3(mes_message_head_t *head, uint32 head_size, const void *body)
{
uint64 start_stat_time = 0;
mes_bufflist_t buff_list;
if (mes_check_msg_head(head) != OG_SUCCESS) {
return OG_ERROR;
}
head->extend_size = head_size - sizeof(mes_message_head_t);
if (g_mes.crc_check_switch) {
char *msg = (char *)body;
head->body_cks = mes_calc_cks(msg, head->size - head_size);
head->head_cks = mes_calc_cks((char *)head + sizeof(head->head_cks), head_size - sizeof(head->head_cks));
}
buff_list.cnt = 0;
mes_append_bufflist(&buff_list, head, head_size);
mes_append_bufflist(&buff_list, body, head->size - head_size);
if (head->dst_inst == g_mes.profile.inst_id) {
return mes_send_inter_buffer_list(&buff_list);
}
mes_get_consume_time_start(&start_stat_time);
MES_STAT_SEND(head);
if (MES_SEND_BUFFLIST(&buff_list) != OG_SUCCESS) {
MES_LOGGING(MES_LOGGING_SEND, "send data from %u to %u failed, cmd=%u, rsn=%u.",
head->src_inst, head->dst_inst, head->cmd, head->rsn);
MES_STAT_SEND_FAIL(head);
return OG_ERROR;
}
mes_consume_with_time(head->cmd, MES_TIME_TEST_SEND, start_stat_time);
return OG_SUCCESS;
}
static inline void mes_protect_when_timeout(mes_waiting_room_t *room)
{
cm_spin_lock(&room->lock, NULL);
(void)cm_atomic32_inc((atomic32_t *)(&room->rsn));
if (pthread_mutex_trylock(&room->mutex) == 0) {
mes_release_message_buf(room->msg_buf);
DTC_MES_LOG_INF("[mes]%s: mutex has unlock, rsn=%u, room rsn=%u.", (char *)__func__,
((mes_message_head_t *)room->msg_buf)->rsn, room->rsn);
}
cm_spin_unlock(&room->lock);
}
static status_t mes_check_connect_ready(void)
{
for (uint32 idx = 0; idx < g_mes.profile.inst_count; idx++) {
if (idx == g_mes.profile.inst_id) {
continue;
}
if (!MES_CONNETION_READY(idx)) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t mes_recv_impl(uint32 sid, mes_message_t *msg, bool32 check_rsn, uint32 expect_rsn, uint32 timeout,
bool8 quick_stop_check)
{
uint32 timeout_time = 0;
uint64 start_stat_time = 0;
mes_get_consume_time_start(&start_stat_time);
mes_check_sid(sid);
mes_waiting_room_t *room = &g_mes.mes_ctx.waiting_rooms[sid];
cm_spin_lock(&room->lock, NULL);
(void)cm_atomic_set(&room->timeout, timeout);
cm_spin_unlock(&room->lock);
for (;;) {
if (mes_check_connect_ready() != OG_SUCCESS) {
MES_LOGGING_WAR(MES_LOGGING_UNMATCH_MSG, "[mes]%s:Network connection interrupted.", (char *)__func__);
return OG_ERROR;
}
if (!mes_mutex_timed_lock(&room->mutex, MES_WAIT_TIMEOUT)) {
timeout_time = (uint32)cm_atomic_get(&room->timeout);
if ((timeout_time == 0) || ((quick_stop_check == OG_TRUE) && (mes_message_need_timeout()))) {
mes_protect_when_timeout(room);
OG_THROW_ERROR_EX(ERR_TCP_TIMEOUT, "sid(%u) recv timeout, rsn=%u, expect_rsn=%u, timeou=%u.",
sid, room->rsn, expect_rsn, timeout);
return OG_ERROR;
}
uint32 new_timeout_time = timeout_time > MES_WAIT_TIMEOUT ? timeout_time - MES_WAIT_TIMEOUT : 0;
(void)cm_atomic_cas(&room->timeout, timeout_time, new_timeout_time);
continue;
}
MES_MESSAGE_ATTACH(msg, room->msg_buf);
if (SECUREC_UNLIKELY(room->rsn != msg->head->rsn)) {
mes_message_head_t *msg_head = (mes_message_head_t *)msg->buffer;
MES_LOGGING_WAR(MES_LOGGING_UNMATCH_MSG,
"[mes]%s: sid %u receive unmatch msg, rsn=%u, room rsn=%u, cmd=%u.",
(char *)__func__, sid, msg_head->rsn, room->rsn, msg_head->cmd);
mes_release_message_buf(msg->buffer);
MES_MESSAGE_DETACH(msg);
continue;
}
break;
}
mes_consume_with_time(msg->head->cmd, MES_TIME_TEST_RECV, start_stat_time);
return OG_SUCCESS;
}
status_t mes_recv(uint32 sid, mes_message_t *msg, bool32 check_rsn, uint32 expect_rsn, uint32 timeout)
{
return mes_recv_impl(sid, msg, check_rsn, expect_rsn, timeout, OG_TRUE);
}
status_t mes_recv_no_quick_stop(uint32 sid, mes_message_t *msg, bool32 check_rsn, uint32 expect_rsn, uint32 timeout)
{
return mes_recv_impl(sid, msg, check_rsn, expect_rsn, timeout, OG_FALSE);
}
void mes_process_msg_ack(void *session, mes_message_t *msg)
{
if (msg->head->dst_sid >= OG_MAX_MES_ROOMS) {
OG_LOG_RUN_ERR("session id(%u) err, larger than %u", msg->head->dst_sid, OG_MAX_MES_ROOMS);
mes_release_message_buf(msg->buffer);
return;
}
mes_waiting_room_t *room = &g_mes.mes_ctx.waiting_rooms[msg->head->dst_sid];
mes_consume_with_time(0, MES_TIME_MES_ACK, msg->head->req_start_time);
cm_spin_lock(&room->lock, NULL);
if (room->rsn == msg->head->rsn) {
MES_LOG_HEAD(msg->head);
room->msg_buf = msg->buffer;
mes_mutex_unlock(&room->mutex);
cm_spin_unlock(&room->lock);
} else {
cm_spin_unlock(&room->lock);
MES_LOGGING_WAR(MES_LOGGING_UNMATCH_MSG,
"[mes]%s: sid %u receive unmatch msg, rsn=%u, room rsn=%u, cmd=%u.",
(char *)__func__, msg->head->dst_sid, msg->head->rsn, room->rsn, msg->head->cmd);
mes_release_message_buf(msg->buffer);
}
return;
}
status_t mes_wait_acks(uint32 sid, uint32 timeout)
{
uint32 timeout_time = 0;
mes_check_sid(sid);
mes_waiting_room_t *room = &g_mes.mes_ctx.waiting_rooms[sid];
cm_spin_lock(&room->lock, NULL);
(void)cm_atomic_set(&room->timeout, timeout);
cm_spin_unlock(&room->lock);
for (;;) {
if (room->req_count == 0) {
break;
}
if (!mes_mutex_timed_lock(&room->broadcast_mutex, MES_WAIT_TIMEOUT)) {
timeout_time = (uint32)cm_atomic_get(&room->timeout);
if (timeout_time == 0) {
room->ack_count = 0;
OG_THROW_ERROR_EX(ERR_TCP_TIMEOUT, "sid %u wait rsn=%u timeout, cmd=%u, timeou=%u.",
sid, room->rsn, room->cmd, timeout);
DTC_MES_LOG_INF("[mes]%s: sid %u wait rsn=%u timeout, check rsn=%u, cmd=%u, timeou=%u.",
(char *)__func__, sid, room->rsn, room->check_rsn, room->cmd, timeout);
return OG_ERROR;
}
uint32 new_timeout_time = timeout_time > MES_WAIT_TIMEOUT ? timeout_time - MES_WAIT_TIMEOUT : 0;
(void)cm_atomic_cas(&room->timeout, timeout_time, new_timeout_time);
continue;
}
cm_spin_lock(&room->lock, NULL);
if (room->ack_count >= room->req_count) {
cm_spin_unlock(&room->lock);
break;
}
cm_spin_unlock(&room->lock);
}
return OG_SUCCESS;
}
static uint64 mes_get_alive_bitmap(void)
{
cluster_view_t view;
if (DB_CLUSTER_NO_CMS) {
view.version = 0;
view.bitmap = 0;
view.is_stable = OG_TRUE;
return view.bitmap;
}
rc_get_cluster_view(&view, OG_FALSE);
return view.bitmap;
}
static uint64 mes_get_resend_bitmap(uint32 sid)
{
mes_waiting_room_t *room = &g_mes.mes_ctx.waiting_rooms[sid];
uint64 alive_inst = mes_get_alive_bitmap();
uint64 resend_bitmap = 0;
for (uint32 i = 0; i < g_mes.profile.inst_count; i++) {
if (i == g_mes.profile.inst_id) {
continue;
}
if ((rc_bitmap64_exist(&room->req_bitmap, i)) && (!rc_bitmap64_exist(&room->ack_bitmap, i))) {
if (!rc_bitmap64_exist(&alive_inst, i)) {
continue;
}
rc_bitmap64_set(&resend_bitmap, i);
}
}
OG_LOG_RUN_WAR("[mes]req_bit=%llu, ack_bit=%llu, alive_inst=%llu, resend_bit=%llu, rsn=%u", room->req_bitmap,
room->ack_bitmap, alive_inst, resend_bitmap, room->rsn);
return resend_bitmap;
}
status_t mes_wait_acks_new(uint32 sid, uint32 timeout, uint64 *resend_bits)
{
uint32 timeout_time = 0;
mes_check_sid(sid);
mes_waiting_room_t *room = &g_mes.mes_ctx.waiting_rooms[sid];
cm_spin_lock(&room->lock, NULL);
(void)cm_atomic_set(&room->timeout, timeout);
cm_spin_unlock(&room->lock);
for (;;) {
if (room->req_count == 0) {
break;
}
if (!mes_mutex_timed_lock(&room->broadcast_mutex, MES_WAIT_TIMEOUT)) {
timeout_time = (uint32)cm_atomic_get(&room->timeout);
if (timeout_time == 0) {
*resend_bits = mes_get_resend_bitmap(sid);
if (*resend_bits == 0) {
break;
}
room->ack_count = 0;
room->ack_bitmap = 0;
OG_LOG_RUN_WAR("[mes] sid %u wait cmd=%u acks timeout, rsn=%u, check rsn=%u, start_time=%llu",
sid, room->cmd, room->rsn, room->check_rsn, room->req_start_time);
return OG_ERROR;
}
uint32 new_timeout_time = timeout_time > MES_WAIT_TIMEOUT ? timeout_time - MES_WAIT_TIMEOUT : 0;
(void)cm_atomic_cas(&room->timeout, timeout_time, new_timeout_time);
continue;
}
cm_spin_lock(&room->lock, NULL);
if (room->ack_count >= room->req_count) {
cm_spin_unlock(&room->lock);
break;
}
cm_spin_unlock(&room->lock);
}
return OG_SUCCESS;
}
status_t mes_broadcast_bufflist_and_wait_with_retry(uint32 sid, uint64 target_bits,
mes_message_head_t *head, uint16 head_size, const void *body, uint32 timeout, uint32 retry_threshold)
{
uint64 resend_bits = 0;
uint32 retry_times = 0;
status_t ret = OG_ERROR;
mes_check_sid(sid);
mes_waiting_room_t *room = &g_mes.mes_ctx.waiting_rooms[sid];
room->err_code = OG_SUCCESS;
mes_broadcast_bufflist_with_retry(sid, target_bits, head, head_size, body);
ret = mes_wait_acks_new(sid, timeout, &resend_bits);
while (ret != OG_SUCCESS) {
if (retry_times >= retry_threshold) {
OG_LOG_RUN_ERR("[mes]broadcast and wait exceeds threshold %u.", retry_threshold);
return OG_ERROR;
}
head->rsn = mes_get_rsn(sid);
OG_LOG_RUN_WAR("[mes]need retry broadcast, resend bits %llu, retry_times %u, cmd %u, rsn %u",
resend_bits, retry_times, head->cmd, head->rsn);
mes_broadcast_bufflist_with_retry(sid, resend_bits, head, head_size, body);
ret = mes_wait_acks_new(sid, timeout, &resend_bits);
retry_times++;
}
if (room->err_code != OG_SUCCESS) {
MES_LOGGING(MES_LOGGING_MESSAGE_STATUS_ERR, "[mes] dst node return false, cmd=%u.", head->cmd);
return OG_ERROR;
}
return OG_SUCCESS;
}
void mes_process_broadcast_ack(void *session, mes_message_t *msg)
{
if (msg->head->dst_sid >= OG_MAX_MES_ROOMS) {
OG_LOG_RUN_ERR("session id(%u) err, larger than %u", msg->head->dst_sid, OG_MAX_MES_ROOMS);
mes_release_message_buf(msg->buffer);
return;
}
mes_waiting_room_t *room = &g_mes.mes_ctx.waiting_rooms[msg->head->dst_sid];
cm_spin_lock(&room->lock, NULL);
if (room->rsn == msg->head->rsn) {
if (msg->head->status != OG_SUCCESS) {
room->err_code = msg->head->status;
}
(void)cm_atomic32_inc(&room->ack_count);
rc_bitmap64_set(&room->ack_bitmap, msg->head->src_inst);
if (room->ack_count >= room->req_count) {
room->check_rsn = msg->head->rsn;
mes_mutex_unlock(&room->broadcast_mutex);
}
cm_spin_unlock(&room->lock);
} else {
cm_spin_unlock(&room->lock);
MES_LOGGING_WAR(MES_LOGGING_UNMATCH_MSG,
"[mes]%s: sid %u receive unmatch msg, rsn=%u, room rsn=%u, cmd=%u.",
(char *)__func__, msg->head->dst_sid, msg->head->rsn, room->rsn, msg->head->cmd);
}
mes_release_message_buf(msg->buffer);
return;
}
static inline void mes_set_inst_bits(uint64 *success_inst, uint64 send_inst)
{
if (success_inst != NULL) {
*success_inst = send_inst;
}
}
static void mes_init_waiting_room(mes_waiting_room_t *room, mes_message_head_t *head, uint64 target_bits)
{
cm_spin_lock(&room->lock, NULL);
room->req_count = g_mes.profile.inst_count - 1;
room->req_bitmap = MES_GET_INST_BITMAP(g_mes.profile.inst_count);
room->ack_count = 0;
room->ack_bitmap = 0;
room->cmd = head->cmd;
room->req_start_time = head->req_start_time;
for (uint32 i = 0; i < g_mes.profile.inst_count; i++) {
if (SECUREC_UNLIKELY(i == g_mes.profile.inst_id)) {
rc_bitmap64_clear(&(room->req_bitmap), i);
continue;
}
if (!MES_IS_INST_SEND(target_bits, i)) {
(void)cm_atomic32_dec(&room->req_count);
rc_bitmap64_clear(&(room->req_bitmap), i);
}
}
cm_spin_unlock(&room->lock);
}
void mes_broadcast(uint32 sid, uint64 inst_bits, const void *msg_data, uint64 *success_inst)
{
uint64 start_stat_time = 0;
uint32 i;
uint64 send_inst = 0;
mes_check_sid(sid);
mes_message_head_t *head = (mes_message_head_t *)msg_data;
mes_waiting_room_t *room = &g_mes.mes_ctx.waiting_rooms[sid];
cm_spin_lock(&room->lock, NULL);
room->req_count = g_mes.profile.inst_count - 1;
room->ack_count = 0;
room->cmd = head->cmd;
room->req_start_time = head->req_start_time;
for (i = 0; i < g_mes.profile.inst_count; i++) {
if (SECUREC_UNLIKELY(i == g_mes.profile.inst_id)) {
continue;
}
if (!MES_IS_INST_SEND(inst_bits, i)) {
(void)cm_atomic32_dec(&room->req_count);
}
}
cm_spin_unlock(&room->lock);
mes_get_consume_time_start(&start_stat_time);
for (i = 0; i < g_mes.profile.inst_count; i++) {
if (SECUREC_UNLIKELY(i == g_mes.profile.inst_id)) {
continue;
}
if (MES_IS_INST_SEND(inst_bits, i)) {
head->dst_inst = i;
if (mes_send_data(msg_data) != OG_SUCCESS) {
(void)cm_atomic32_dec(&room->req_count);
MES_LOGGING_WAR(MES_LOGGING_BROADCAST, "[mes]: multicast to instance %d failed", i);
continue;
}
MES_INST_SENT_SUCCESS(send_inst, i);
MES_STAT_SEND(head);
}
}
mes_set_inst_bits(success_inst, send_inst);
mes_consume_with_time(head->cmd, MES_TIME_TEST_MULTICAST, start_stat_time);
return;
}
void mes_broadcast_data_with_retry(uint32 sid, uint64 target_bits, const void *msg_data, bool8 allow_send_fail)
{
uint64 start_stat_time = 0;
status_t ret = OG_ERROR;
mes_check_sid(sid);
mes_message_head_t *head = (mes_message_head_t *)msg_data;
mes_waiting_room_t *room = &g_mes.mes_ctx.waiting_rooms[sid];
mes_init_waiting_room(room, head, target_bits);
mes_get_consume_time_start(&start_stat_time);
for (uint32 i = 0; i < g_mes.profile.inst_count; i++) {
if (SECUREC_UNLIKELY(i == g_mes.profile.inst_id)) {
continue;
}
if (!MES_IS_INST_SEND(target_bits, i)) {
continue;
}
head->dst_inst = i;
do {
ret = mes_send_data(msg_data);
if (ret == OG_SUCCESS) {
MES_STAT_SEND(head);
break;
}
uint64 alive_inst = mes_get_alive_bitmap();
if (!rc_bitmap64_exist(&alive_inst, i)) {
(void)cm_atomic32_dec(&room->req_count);
rc_bitmap64_clear(&(room->req_bitmap), i);
MES_LOGGING(MES_LOGGING_BROADCAST, "[mes] dst inst %d is not alive, alive_bit %llu, cmd=%u, rsn=%u",
i, alive_inst, head->cmd, head->rsn);
break;
}
if (allow_send_fail && !MES_CONNETION_READY(i)) {
MES_LOGGING(MES_LOGGING_BROADCAST, "[mes] dst inst %d is not connected, cmd=%u, rsn=%u",
i, head->cmd, head->rsn);
break;
}
MES_LOGGING(MES_LOGGING_BROADCAST, "[mes] dst inst %u is alive, need retry, alive_bit=%llu, cmd=%u, rsn=%u",
i, alive_inst, head->cmd, head->rsn);
cm_sleep(MES_BROADCAST_SEND_TIME_INTERVAL);
} while (OG_TRUE);
}
mes_consume_with_time(head->cmd, MES_TIME_TEST_MULTICAST, start_stat_time);
return;
}
status_t mes_broadcast_and_wait(uint32 sid, uint64 inst_bits, const void *msg_data, uint32 timeout,
uint64 *success_inst)
{
uint64 start_stat_time = 0;
mes_get_consume_time_start(&start_stat_time);
mes_check_sid(sid);
mes_waiting_room_t *room = &g_mes.mes_ctx.waiting_rooms[sid];
room->err_code = OG_SUCCESS;
mes_broadcast(sid, inst_bits, msg_data, success_inst);
if (mes_wait_acks(sid, timeout) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[mes]mes_wait_acks failed.");
return OG_ERROR;
}
if (room->err_code != OG_SUCCESS) {
MES_LOGGING(MES_LOGGING_MESSAGE_STATUS_ERR, "[mes] dst node return false, cmd=%u.", ((mes_message_head_t *)msg_data)->cmd);
return OG_ERROR;
}
mes_consume_with_time(((mes_message_head_t *)msg_data)->cmd, MES_TIME_TEST_MULTICAST_AND_WAIT, start_stat_time);
return OG_SUCCESS;
}
static status_t mes_broadcast_data_with_retry_impl(uint32 sid, uint64 target_bits,
const void *msg_data, uint32 timeout, uint32 retry_threshold, bool8 allow_send_fail)
{
status_t ret = OG_ERROR;
uint64 resend_bits = 0;
uint32 retry_times = 0;
mes_check_sid(sid);
mes_waiting_room_t *room = &g_mes.mes_ctx.waiting_rooms[sid];
mes_message_head_t *head = (mes_message_head_t *)msg_data;
room->err_code = OG_SUCCESS;
mes_broadcast_data_with_retry(sid, target_bits, msg_data, allow_send_fail);
ret = mes_wait_acks_new(sid, timeout, &resend_bits);
while (ret != OG_SUCCESS) {
if (retry_times >= retry_threshold) {
OG_LOG_RUN_ERR("[mes]broadcast and wait exceeds threshold %u.", retry_threshold);
return OG_ERROR;
}
head->rsn = mes_get_rsn(sid);
OG_LOG_RUN_WAR("[mes]need retry broadcast data, resend bits %llu, retry_times %u, cmd %u, rsn %u",
resend_bits, retry_times, head->cmd, head->rsn);
mes_broadcast_data_with_retry(sid, target_bits, msg_data, allow_send_fail);
ret = mes_wait_acks_new(sid, timeout, &resend_bits);
retry_times++;
}
if (room->err_code != OG_SUCCESS) {
MES_LOGGING(MES_LOGGING_MESSAGE_STATUS_ERR, "[mes] dst node return false, cmd=%u.", head->cmd);
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t mes_broadcast_data_and_wait_with_retry(uint32 sid, uint64 target_bits, const void *msg_data, uint32 timeout,
uint32 retry_threshold)
{
return mes_broadcast_data_with_retry_impl(sid, target_bits, msg_data, timeout, retry_threshold, OG_FALSE);
}
status_t mes_broadcast_data_and_wait_with_retry_allow_send_fail(uint32 sid, uint64 target_bits, const void *msg_data,
uint32 timeout, uint32 retry_threshold)
{
return mes_broadcast_data_with_retry_impl(sid, target_bits, msg_data, timeout, retry_threshold, OG_TRUE);
}
void mes_broadcast_data3(uint32 sid, mes_message_head_t *head, uint32 head_size, const void *body)
{
uint64 start_stat_time = 0;
const char *msg = NULL;
source_location_t loc;
int32 code;
mes_check_sid(sid);
mes_waiting_room_t *room = &g_mes.mes_ctx.waiting_rooms[sid];
cm_get_error(&code, &msg, &loc);
cm_spin_lock(&room->lock, NULL);
room->req_count = g_mes.profile.inst_count - 1;
room->req_bitmap = 0;
room->ack_count = 0;
room->ack_bitmap = 0;
cm_spin_unlock(&room->lock);
mes_get_consume_time_start(&start_stat_time);
for (uint32 i = 0; i < g_mes.profile.inst_count; i++) {
if (i == g_mes.profile.inst_id) {
continue;
}
head->dst_inst = i;
if (mes_send_data3(head, head_size, body) != OG_SUCCESS) {
cm_revert_error(code, msg, &loc);
(void)cm_atomic32_dec(&room->req_count);
MES_LOGGING(MES_LOGGING_BROADCAST, "MES broadcast to instance %d failed", i);
continue;
}
rc_bitmap64_set(&(room->req_bitmap), i);
MES_STAT_SEND(head);
}
mes_consume_with_time(head->cmd, MES_TIME_TEST_BROADCAST, start_stat_time);
return;
}
void mes_broadcast_bufflist_with_retry(uint32 sid, uint64 target_bits, mes_message_head_t *head,
uint16 head_size, const void *body)
{
uint32 i;
status_t ret = OG_ERROR;
mes_check_sid(sid);
mes_waiting_room_t *room = &g_mes.mes_ctx.waiting_rooms[sid];
mes_init_waiting_room(room, head, target_bits);
for (i = 0; i < g_mes.profile.inst_count; i++) {
if (i == g_mes.profile.inst_id) {
continue;
}
if (!MES_IS_INST_SEND(target_bits, i)) {
continue;
}
head->dst_inst = i;
do {
ret = mes_send_data3(head, head_size, body);
if (ret == OG_SUCCESS) {
MES_STAT_SEND(head);
break;
}
uint64 alive_inst = mes_get_alive_bitmap();
if (!rc_bitmap64_exist(&alive_inst, i)) {
(void)cm_atomic32_dec(&room->req_count);
rc_bitmap64_clear(&(room->req_bitmap), i);
MES_LOGGING(MES_LOGGING_BROADCAST, "[mes] dst inst %d is not alive, alive_bit %llu, cmd=%u, rsn=%u",
i, alive_inst, head->cmd, head->rsn);
break;
}
MES_LOGGING(MES_LOGGING_BROADCAST, "[mes] dst inst %u is alive, need retry, alive_bit=%llu, cmd=%u, rsn=%u",
i, alive_inst, head->cmd, head->rsn);
cm_sleep(MES_BROADCAST_SEND_TIME_INTERVAL);
} while (OG_TRUE);
}
return;
}
status_t mes_message_vertify_cks(mes_message_t *msg)
{
if (msg == NULL) {
OG_LOG_RUN_ERR("mes vertify cks input msg is null");
return OG_ERROR;
}
uint16 cks;
uint16 headSize = sizeof(mes_message_head_t) + msg->head->extend_size;
if (!mes_verify_cks(msg->head->head_cks, (char *)msg->head + sizeof(msg->head->head_cks), headSize -
sizeof(msg->head->head_cks), &cks)) {
mes_release_message_buf(msg->buffer);
OG_LOG_RUN_ERR("[mes] check head cks failed, cks=%u, old cks=%u, msg_size=%u, ext_size=%u, "
"cmd=%u, rsn=%u, src_inst=%u, dst_inst=%u, src_sid=%u, dst_sid=%u", cks, msg->head->head_cks,
msg->head->size, msg->head->extend_size, msg->head->cmd, msg->head->rsn, msg->head->src_inst,
msg->head->dst_inst, msg->head->src_sid, msg->head->dst_sid);
return OG_ERROR;
}
char *body = msg->buffer + headSize;
if (!mes_verify_cks(msg->head->body_cks, body, msg->head->size - headSize, &cks)) {
mes_release_message_buf(msg->buffer);
OG_LOG_RUN_ERR("[mes] check cks failed, cks=%u, old cks=%u, msg_size=%u, ext_size=%u, "
"cmd=%u, rsn=%u, src_inst=%u, dst_inst=%u, src_sid=%u, dst_sid=%u", cks, msg->head->body_cks,
msg->head->size, msg->head->extend_size, msg->head->cmd, msg->head->rsn, msg->head->src_inst,
msg->head->dst_inst, msg->head->src_sid, msg->head->dst_sid);
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t mes_register_proc_func(mes_message_proc_t proc)
{
g_mes.proc = proc;
return OG_SUCCESS;
}
void mes_set_msg_enqueue(mes_command_t command, bool32 is_enqueue)
{
g_mes.is_enqueue[command] = is_enqueue;
return;
}
bool32 mes_get_msg_enqueue(mes_command_t command)
{
return g_mes.is_enqueue[command];
}
uint32 mes_get_msg_queue_length(uint8 group_id)
{
return g_mes.mq_ctx.group.task_group[group_id].queue.count;
}
uint32 mes_get_msg_task_queue_length(uint32 task_index)
{
return g_mes.mq_ctx.tasks[task_index].queue.count;
}
mes_channel_stat_t mes_get_channel_state(uint8 inst_id)
{
if (g_mes.profile.pipe_type == CS_TYPE_TCP) {
return mes_tcp_get_channel_state(inst_id);
} else if (g_mes.profile.pipe_type == CS_TYPE_UC || g_mes.profile.pipe_type == CS_TYPE_UC_RDMA) {
return mes_uc_get_channel_state(inst_id);
}
return MES_CHANNEL_CEIL;
}
thread_t* mes_get_msg_task_thread(uint32 task_index)
{
return &(g_mes.mq_ctx.tasks[task_index].thread);
}
uint32 mes_get_rsn(uint32 sid)
{
uint32 rsn;
mes_check_sid(sid);
mes_waiting_room_t *room = &g_mes.mes_ctx.waiting_rooms[sid];
cm_spin_lock(&room->lock, NULL);
rsn = cm_atomic32_inc((atomic32_t *)(&room->rsn));
cm_spin_unlock(&room->lock);
return rsn;
}
uint32 mes_get_current_rsn(uint32 sid)
{
uint32 rsn;
mes_check_sid(sid);
mes_waiting_room_t *room = &g_mes.mes_ctx.waiting_rooms[sid];
cm_spin_lock(&room->lock, NULL);
rsn = room->rsn;
cm_spin_unlock(&room->lock);
return rsn;
}
#ifdef WIN32
void mes_mutex_destroy(mes_mutex_t *mutex)
{
(void)CloseHandle(*mutex);
}
status_t mes_mutex_create(mes_mutex_t *mutex)
{
*mutex = CreateSemaphore(NULL, 0, OG_MAX_MES_ROOMS, NULL);
if (*mutex == NULL) {
OG_THROW_ERROR_EX(ERR_MES_CREATE_MUTEX, "errno: %d", GetLastError());
return OG_ERROR;
}
return OG_SUCCESS;
}
void mes_mutex_lock(mes_mutex_t *mutex)
{
(void)WaitForSingleObject(*mutex, INFINITE);
}
bool32 mes_mutex_timed_lock(mes_mutex_t *mutex, uint32 timeout)
{
uint32 code = WaitForSingleObject(*mutex, timeout);
return (code == WAIT_OBJECT_0);
}
void mes_mutex_unlock(mes_mutex_t *mutex)
{
ReleaseSemaphore(*mutex, 1, NULL);
}
#else
void mes_mutex_destroy(mes_mutex_t *mutex)
{
(void)pthread_mutex_destroy(mutex);
}
status_t mes_mutex_create(mes_mutex_t *mutex)
{
if (0 != pthread_mutex_init(mutex, NULL)) {
OG_THROW_ERROR_EX(ERR_MES_CREATE_MUTEX, "errno: %d", (int32)errno);
return OG_ERROR;
}
(void)pthread_mutex_lock(mutex);
return OG_SUCCESS;
}
void mes_mutex_lock(mes_mutex_t *mutex)
{
(void)pthread_mutex_lock(mutex);
}
static void mes_get_timespec(struct timespec *tim, uint32 timeout)
{
struct timespec tv;
(void)clock_gettime(CLOCK_REALTIME, &tv);
tim->tv_sec = tv.tv_sec + timeout / 1000;
tim->tv_nsec = tv.tv_nsec + ((long)timeout % 1000) * 1000000;
if (tim->tv_nsec >= 1000000000) {
tim->tv_sec++;
tim->tv_nsec -= 1000000000;
}
}
bool32 mes_mutex_timed_lock(mes_mutex_t *mutex, uint32 timeout)
{
struct timespec ts;
mes_get_timespec(&ts, timeout);
return (pthread_mutex_timedlock(mutex, &ts) == 0);
}
void mes_mutex_unlock(mes_mutex_t *mutex)
{
(void)pthread_mutex_unlock(mutex);
}
#endif
static void cm_get_time_of_day(cm_timeval *tv)
{
int ret;
ret = cm_gettimeofday(tv);
if (ret != 0) {
perror("[Time Error]cm_get_time_of_day error");
}
}
uint64 cm_get_time_usec(void)
{
if (g_mes_elapsed_stat.mes_elapsed_switch) {
cm_timeval now;
uint64 now_usec;
cm_get_time_of_day(&now);
now_usec = (uint64)now.tv_sec * (uint64)(1000 * 1000) + (uint64)now.tv_usec;
return now_usec;
}
return 0;
}
void mes_send_error_msg(mes_message_head_t *head)
{
mes_error_msg_t error_msg;
mes_init_ack_head(head, &error_msg.head, MES_CMD_ERROR_MSG, sizeof(mes_error_msg_t) + OG_MESSAGE_BUFFER_SIZE,
OG_INVALID_ID16);
error_msg.code = g_tls_error.code;
error_msg.loc = g_tls_error.loc;
if (mes_send_data3(&error_msg.head, sizeof(mes_error_msg_t), g_tls_error.message) != OG_SUCCESS) {
OG_LOG_RUN_ERR("send error msg to instance %d failed.", head->src_inst);
}
cm_reset_error();
}
void mes_handle_error_msg(const void *msg_data)
{
mes_error_msg_t *error_msg = (mes_error_msg_t *)msg_data;
char *message = (char *)error_msg + sizeof(mes_error_msg_t);
errno_t ret;
if (g_tls_error.is_ignored) {
return;
}
g_tls_error.code = error_msg->code;
g_tls_error.loc = error_msg->loc;
ret = memcpy_sp(g_tls_error.message, OG_MESSAGE_BUFFER_SIZE, message, OG_MESSAGE_BUFFER_SIZE);
MEMS_RETVOID_IFERR(ret);
}
int64 mes_get_stat_send_count(mes_command_t cmd)
{
return g_mes_stat[cmd].send_count;
}
int64 mes_get_stat_send_fail_count(mes_command_t cmd)
{
return g_mes_stat[cmd].send_fail_count;
}
int64 mes_get_stat_recv_count(mes_command_t cmd)
{
return g_mes_stat[cmd].recv_count;
}
int64 mes_get_stat_local_count(mes_command_t cmd)
{
return g_mes_stat[cmd].local_count;
}
atomic32_t mes_get_stat_dealing_count(mes_command_t cmd)
{
return g_mes_stat[cmd].dealing_count;
}
bool8 mes_get_stat_non_empty(mes_command_t cmd)
{
return g_mes_stat[cmd].non_empty;
}
bool8 mes_get_elapsed_switch(void)
{
return g_mes_elapsed_stat.mes_elapsed_switch;
}
void mes_set_elapsed_switch(bool8 elapsed_switch)
{
g_mes_elapsed_stat.mes_elapsed_switch = elapsed_switch;
OG_LOG_RUN_INF("mes elapsed switch = %d.", g_mes_elapsed_stat.mes_elapsed_switch);
}
void mes_set_crc_check_switch(bool8 crc_check_switch)
{
g_mes.crc_check_switch = crc_check_switch;
OG_LOG_RUN_INF("mes crc check switch = %d.", g_mes.crc_check_switch);
}
void mes_set_ssl_switch(bool8 use_ssl)
{
g_mes.profile.use_ssl = use_ssl;
OG_LOG_RUN_INF("[mes] ssl switch = %d.", g_mes.profile.use_ssl);
}
status_t mes_set_ssl_crt_file(const char *cert_dir, const char *ca_file, const char *cert_file, const char *key_file,
const char* crl_file, const char* pass_file)
{
if (cert_dir == NULL || ca_file == NULL || cert_dir == NULL || key_file == NULL || crl_file == NULL || pass_file ==
NULL) {
return OG_ERROR;
}
MEMS_RETURN_IFERR(memcpy_sp(g_mes_ssl_auth_file.cert_dir, OG_FILE_NAME_BUFFER_SIZE, cert_dir,
OG_FILE_NAME_BUFFER_SIZE));
MEMS_RETURN_IFERR(memcpy_sp(g_mes_ssl_auth_file.ca_file, OG_FILE_NAME_BUFFER_SIZE, ca_file,
OG_FILE_NAME_BUFFER_SIZE));
MEMS_RETURN_IFERR(memcpy_sp(g_mes_ssl_auth_file.cert_file, OG_FILE_NAME_BUFFER_SIZE, cert_file,
OG_FILE_NAME_BUFFER_SIZE));
MEMS_RETURN_IFERR(memcpy_sp(g_mes_ssl_auth_file.key_file, OG_FILE_NAME_BUFFER_SIZE, key_file,
OG_FILE_NAME_BUFFER_SIZE));
MEMS_RETURN_IFERR(memcpy_sp(g_mes_ssl_auth_file.pass_file, OG_FILE_NAME_BUFFER_SIZE, pass_file,
OG_FILE_NAME_BUFFER_SIZE));
if (cm_file_exist(crl_file)) {
MEMS_RETURN_IFERR(memcpy_sp(g_mes_ssl_auth_file.crl_file, OG_FILE_NAME_BUFFER_SIZE, crl_file,
OG_FILE_NAME_BUFFER_SIZE));
} else {
MEMS_RETURN_IFERR(memset_sp(g_mes_ssl_auth_file.crl_file, OG_FILE_NAME_BUFFER_SIZE, 0,
OG_FILE_NAME_BUFFER_SIZE));
}
return OG_SUCCESS;
}
status_t mes_set_ssl_key_pwd(const char *enc_pwd)
{
if (enc_pwd != NULL) {
MEMS_RETURN_IFERR(memcpy_sp(g_mes_ssl_auth_file.key_pwd, OG_PASSWORD_BUFFER_SIZE, enc_pwd, strlen(enc_pwd)));
} else {
MEMS_RETURN_IFERR(memset_sp(g_mes_ssl_auth_file.key_pwd, OG_PASSWORD_BUFFER_SIZE, 0, OG_PASSWORD_BUFFER_SIZE));
}
return OG_SUCCESS;
}
void mes_set_ssl_verify_peer(bool32 verify_peer)
{
g_mes.profile.ssl_verify_peer = verify_peer;
}
ssl_auth_file_t *mes_get_ssl_auth_file(void)
{
return &g_mes_ssl_auth_file;
}
void mes_set_dbstor_enable(bool32 enable)
{
g_enable_dbstor = enable;
}
uint64 mes_get_elapsed_time(mes_command_t cmd, mes_time_stat_t type)
{
return g_mes_elapsed_stat.time_consume_stat[cmd].time[type];
}
int64 mes_get_elapsed_count(mes_command_t cmd, mes_time_stat_t type)
{
return g_mes_elapsed_stat.time_consume_stat[cmd].count[type];
}
bool8 mes_get_elapsed_non_empty(mes_command_t cmd)
{
return g_mes_elapsed_stat.time_consume_stat[cmd].non_empty;
}
bool8 mes_is_inst_connect(uint32 inst_id)
{
return g_mes.mes_ctx.conn_arr[inst_id].is_connect;
}
uint8 mes_get_cmd_group(mes_command_t cmd)
{
return g_mes.mq_ctx.command_attr[cmd].group_id;
}
void mes_init_mq_local_queue(void)
{
init_msgqueue(&g_mes.mq_ctx.local_queue);
}
status_t mes_set_process_config(void)
{
return mes_uc_set_process_config();
}