* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* mes_msg_pool.c
*
*
* IDENTIFICATION
* src/mec/mes_msg_pool.c
*
* -------------------------------------------------------------------------
*/
#include "mes_log_module.h"
#include "mes_func.h"
message_pool_t *g_send_pool;
message_pool_t *g_recv_pool;
status_t mes_init_message_pool(void)
{
status_t ret;
if ((g_mes.profile.buffer_pool_attr.pool_count == 0) ||
(g_mes.profile.buffer_pool_attr.pool_count > MES_MAX_BUFFER_STEP_NUM)) {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "[mes] pool_count %u is invalid, legal scope is [1, %u].",
g_mes.profile.buffer_pool_attr.pool_count, MES_MAX_BUFFER_STEP_NUM);
return OG_ERROR;
}
for (uint32 i = 0; i < g_mes.profile.buffer_pool_attr.pool_count; i++) {
ret = mes_create_buffer_chunk(&g_mes.mes_ctx.msg_pool.chunk[i], i, &g_mes.profile.buffer_pool_attr.buf_attr[i]);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("[mes]: create buf chunk failed.");
return OG_ERROR;
}
}
g_mes.mes_ctx.msg_pool.count = g_mes.profile.buffer_pool_attr.pool_count;
return OG_SUCCESS;
}
void mes_destory_message_pool(void)
{
for (uint32 i = 0; i < g_mes.profile.buffer_pool_attr.pool_count; i++) {
mes_destory_buffer_chunk(&g_mes.mes_ctx.msg_pool.chunk[i]);
}
return;
}
void mes_init_buf_queue(mes_buf_queue_t *queue)
{
queue->lock = 0;
queue->first = NULL;
queue->last = NULL;
queue->count = 0;
queue->addr = NULL;
}
status_t mes_create_buffer_queue(mes_buf_queue_t *queue, uint8 chunk_no, uint8 queue_no, uint32 buf_size,
uint32 buf_count)
{
uint64 mem_size;
mes_buffer_item_t *buf_node;
mes_buffer_item_t *buf_node_next;
uint32 buf_item_size;
char *temp_buffer;
if (buf_count == 0) {
OG_THROW_ERROR_EX(ERR_MES_CREATE_AREA, "mes_pool_size should greater than 0.");
return OG_ERROR;
}
mes_init_buf_queue(queue);
queue->chunk_no = chunk_no;
queue->queue_no = queue_no;
queue->buf_size = buf_size;
queue->count = buf_count;
buf_item_size = sizeof(mes_buffer_item_t) - MES_MIN_BUFFER_SIZE + buf_size;
mem_size = (uint64)buf_count * (uint64)buf_item_size;
queue->addr = malloc(mem_size);
if (queue->addr == NULL) {
OG_THROW_ERROR_EX(ERR_MES_CREATE_AREA, "allocate memory size %llu for MES msg pool failed", mem_size);
return OG_ERROR;
}
temp_buffer = queue->addr;
buf_node = (mes_buffer_item_t *)temp_buffer;
queue->first = buf_node;
for (uint32 i = 1; i < buf_count; i++) {
temp_buffer += buf_item_size;
buf_node_next = (mes_buffer_item_t *)temp_buffer;
buf_node->chunk_no = chunk_no;
buf_node->queue_no = queue_no;
buf_node->next = buf_node_next;
buf_node = buf_node_next;
}
buf_node->chunk_no = chunk_no;
buf_node->queue_no = queue_no;
buf_node->next = NULL;
queue->last = buf_node;
return OG_SUCCESS;
}
void mes_destory_buffer_queue(mes_buf_queue_t *queue)
{
if (queue == NULL || queue->addr == NULL) {
return;
}
free(queue->addr);
queue->addr = NULL;
}
static void mes_set_buffer_queue_count(mes_buf_chunk_t *chunk, uint32 queue_num, uint32 tatol_count)
{
uint32 buf_count;
uint32 buf_leftover;
if (queue_num == 0) {
return;
}
buf_count = tatol_count / queue_num;
buf_leftover = tatol_count % queue_num;
for (uint32 i = 0; i < queue_num; i++) {
chunk->queues[i].count = buf_count;
}
for (uint32 i = 0; i < buf_leftover; i++) {
chunk->queues[i].count++;
}
return;
}
status_t mes_create_buffer_chunk(mes_buf_chunk_t *chunk, uint32 chunk_no, mes_buffer_attr_t *buf_attr)
{
errno_t ret;
uint32 queues_size;
uint32 queue_num = buf_attr->queue_count;
if (queue_num == 0 || queue_num > MES_MAX_BUFFER_QUEUE_NUM) {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "[mes] pool_count %u is invalid, legal scope is [1, %u].", queue_num,
MES_MAX_BUFFER_STEP_NUM);
return OG_ERROR;
}
queues_size = queue_num * sizeof(mes_buf_queue_t);
chunk->queues = (mes_buf_queue_t *)malloc(queues_size);
if (chunk->queues == NULL) {
OG_THROW_ERROR_EX(ERR_MES_CREATE_AREA, "allocate memory queue_num %u failed", queue_num);
return OG_ERROR;
}
ret = memset_sp(chunk->queues, queues_size, 0, queues_size);
MEMS_RETURN_IFERR(ret);
chunk->chunk_no = (uint8)chunk_no;
chunk->buf_size = buf_attr->size;
chunk->queue_num = queue_num;
chunk->current_no = 0;
mes_set_buffer_queue_count(chunk, queue_num, buf_attr->count);
for (uint32 i = 0; i < queue_num; i++) {
if (mes_create_buffer_queue(&chunk->queues[i], chunk_no, i, buf_attr->size, chunk->queues[i].count) !=
OG_SUCCESS) {
OG_LOG_RUN_ERR("[mes]: create buf queue failed.");
return OG_ERROR;
}
}
return OG_SUCCESS;
}
void mes_destory_buffer_chunk(mes_buf_chunk_t *chunk)
{
if (chunk == NULL || chunk->queues == NULL) {
return;
}
for (uint32 i = 0; i < chunk->queue_num; i++) {
mes_destory_buffer_queue(&chunk->queues[i]);
}
free(chunk->queues);
chunk->queues = NULL;
return;
}
static inline mes_buf_chunk_t *mes_get_buffer_chunk(uint32 len)
{
mes_buf_chunk_t *chunk;
for (uint32 i = 0; i < g_mes.mes_ctx.msg_pool.count; i++) {
chunk = &g_mes.mes_ctx.msg_pool.chunk[i];
if (len <= chunk->buf_size) {
return chunk;
}
}
OG_LOG_RUN_ERR("[mes]: There is not long enough buffer pool for %u.", len);
return NULL;
}
static inline mes_buf_queue_t *mes_get_buffer_queue(mes_buf_chunk_t *chunk)
{
mes_buf_queue_t *queue = NULL;
queue = &chunk->queues[chunk->current_no % chunk->queue_num];
chunk->current_no++;
return queue;
}
static void print_no_buffer_log(void)
{
int32 deal_count = 0;
uint32 queue_len = 0;
MES_LOGGING(MES_LOGGING_GET_BUF, "[mes]: There is no buffer, sleep and try again.");
for (uint32 cmd_loop = 1; cmd_loop < MES_CMD_CEIL; cmd_loop++) {
deal_count = mes_get_stat_dealing_count(cmd_loop);
if (deal_count > 0) {
MES_CMD_LOGGING(cmd_loop, "There is no buffer, cmd = %u, deal_count = %d", cmd_loop, deal_count);
}
}
for (uint32 group_id = 0; group_id < MES_TASK_GROUP_ALL; group_id++) {
queue_len = mes_get_msg_queue_length(group_id);
if (queue_len > 0) {
MES_GROUP_LOGGING(group_id, "There is no buffer, group = %u, queue_len = %d", group_id, queue_len);
}
}
}
char *mes_alloc_buf_item(uint32 len)
{
mes_buf_chunk_t *chunk = NULL;
mes_buf_queue_t *queue = NULL;
mes_buffer_item_t *buf_node = NULL;
uint32 find_times = 0;
chunk = mes_get_buffer_chunk(len);
if (chunk == NULL || chunk->queues == NULL) {
OG_LOG_RUN_ERR("[mes]: Get buffer failed.");
return NULL;
}
do {
queue = mes_get_buffer_queue(chunk);
cm_spin_lock(&queue->lock, NULL);
if (queue->count > 0) {
buf_node = queue->first;
queue->first = buf_node->next;
queue->count--;
buf_node->next = NULL;
cm_spin_unlock(&queue->lock);
break;
} else {
cm_spin_unlock(&queue->lock);
find_times++;
if ((find_times % chunk->queue_num) == 0) {
print_no_buffer_log();
cm_sleep(1);
}
}
} while (buf_node == NULL);
return buf_node->data;
}
void mes_free_buf_item(char *buffer)
{
mes_buffer_item_t *buf_item = (mes_buffer_item_t *)(buffer - MES_BUFFER_ITEM_SIZE);
mes_buf_chunk_t *chunk = &g_mes.mes_ctx.msg_pool.chunk[buf_item->chunk_no];
mes_buf_queue_t *queue = &chunk->queues[buf_item->queue_no];
cm_spin_lock(&queue->lock, NULL);
if (queue->count > 0) {
queue->last->next = buf_item;
queue->last = buf_item;
} else {
queue->first = buf_item;
queue->last = buf_item;
}
queue->count++;
cm_spin_unlock(&queue->lock);
return;
}
void mes_destory_buf_pool(message_pool_t *pool)
{
if (pool->buffer != NULL) {
free(pool->buffer);
pool->buffer = NULL;
}
return;
}
char *mes_alloc_pool_buf(message_pool_t *pool)
{
char *msg_buf;
uint32 id = pool->get_no % pool->size;
cm_spin_lock(&pool->lock, NULL);
while (pool->items[id] == NULL) {
cm_spin_unlock(&pool->lock);
cm_spin_sleep();
pool->get_no++;
id = pool->get_no % pool->size;
cm_spin_lock(&pool->lock, NULL);
}
pool->get_no++;
msg_buf = pool->items[id];
pool->items[id] = NULL;
cm_spin_unlock(&pool->lock);
return msg_buf;
}
static void mes_release_pool_buf(message_pool_t *pool, const char *msg_buf)
{
uint32 id;
cm_spin_lock(&pool->lock, NULL);
id = *(uint32 *)(msg_buf - sizeof(uint32));
CM_ASSERT(pool->items[id] == NULL);
pool->items[id] = (char *)msg_buf;
cm_spin_unlock(&pool->lock);
return;
}
void mes_init_send_recv_buf_pool(void)
{
if (g_mes.profile.pipe_type == CS_TYPE_TCP) {
g_send_pool = &g_mes.mes_ctx.msg_pool.big_pool;
g_recv_pool = &g_mes.mes_ctx.msg_pool.big_pool;
} else if (g_mes.profile.pipe_type == CS_TYPE_UC || g_mes.profile.pipe_type == CS_TYPE_UC_RDMA) {
g_send_pool = &g_mes.mes_ctx.msg_pool.big_pool;
g_recv_pool = &g_mes.mes_ctx.msg_pool.big_pool;
} else {
OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "pipe_type %u is invalid", g_mes.profile.pipe_type);
return;
}
return;
}
char *mes_alloc_send_buf(void)
{
return mes_alloc_pool_buf(g_send_pool);
}
void mes_release_send_buf(const char *buffer)
{
return mes_release_pool_buf(g_send_pool, buffer);
}
char *mes_alloc_recv_buf(void)
{
return mes_alloc_pool_buf(g_recv_pool);
}
void mes_release_buf_stat(const char *msg_buf)
{
mes_message_head_t *head = (mes_message_head_t *)msg_buf;
mes_elapsed_stat(head->cmd, MES_TIME_PUT_BUF);
MES_LOG_HEAD_BUF(head, msg_buf);
return;
}
void mes_release_recv_buf(const char *buffer)
{
mes_release_buf_stat(buffer);
mes_release_pool_buf(g_recv_pool, buffer);
return;
}