* Copyright (c) Huawei Technologies Co., Ltd. 2020-2021. All rights reserved.
* gazelle is licensed under the Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
* PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include <securec.h>
#include <rte_errno.h>
#include <rte_cycles.h>
#include <lwip/arch/sys_arch.h>
#include "lstack_mempool.h"
#include "lstack_log.h"
#include "lstack_cfg.h"
#include "common/dpdk_common.h"
#include "lstack_dpdk.h"
#include "lstack_protocol_stack.h"
#include "lstack_unistd.h"
#define MEM_THREAD_TASK_PATH "/proc/%d/task/%d/stat"
#define MEM_THREAD_MAX_PATH 32
#define MEM_THREAD_FLUSH_SIG (SIGRTMIN + 11)
#define MEM_THREAD_MANAGER_FLUSH_MS 100
#define MEM_THREAD_MANAGER_FREE_S 20
#define MEM_THREAD_MANAGER_FREE_MAX 64
struct mem_thread_manager {
struct list_node mt_work_list;
struct list_node mt_free_list;
rte_spinlock_t list_lock;
uint32_t flush_time;
unsigned thread_num;
};
struct mem_thread_group {
char task_path[MEM_THREAD_MAX_PATH];
int tid;
pthread_t thread;
struct list_node mt_node;
struct mem_thread mt_array[PROTOCOL_STACK_MAX];
uint32_t used_time;
bool used_flag;
bool siged_flag;
};
static struct mem_stack g_mem_stack_group[PROTOCOL_STACK_MAX] = {0};
static PER_THREAD struct mem_thread_group *g_mem_thread_group = NULL;
static struct mem_thread_manager g_mem_thread_manager = {0};
static __rte_always_inline
struct mem_stack *mem_stack_get(int stack_id)
{
return &g_mem_stack_group[stack_id];
}
struct rte_mempool *mem_get_mbuf_pool(int stack_id)
{
return g_mem_stack_group[stack_id].mbuf_pool;
}
struct rte_mempool *mem_get_rpc_pool(int stack_id)
{
return g_mem_stack_group[stack_id].rpc_pool;
}
unsigned mem_stack_mbuf_pool_count(int stack_id)
{
const struct mem_stack *ms = mem_stack_get(stack_id);
return rte_mempool_avail_count(ms->mbuf_pool);
}
unsigned mem_stack_rpc_pool_count(int stack_id)
{
const struct mem_stack *ms = mem_stack_get(stack_id);
return rte_mempool_avail_count(ms->rpc_pool);
}
static inline unsigned mem_stack_pool_ring_count(const struct rte_mempool *pool)
{
* when RTE_MAX_LCORE is too large, it's time-consuming
*/
return rte_ring_count(pool->pool_data);
}
static inline bool mem_thread_group_in_used(const struct mem_thread_group *mt_grooup, uint32_t timeout)
{
return mt_grooup->used_flag ||
(sys_now() - mt_grooup->used_time < timeout);
}
static inline void mem_thread_group_used(void)
{
g_mem_thread_group->used_flag = true;
g_mem_thread_group->used_time = sys_now();
}
static inline void mem_thread_group_done(void)
{
g_mem_thread_group->used_flag = false;
}
bool mem_thread_ignore_flush_intr(void)
{
if (likely(g_mem_thread_group != NULL) && g_mem_thread_group->siged_flag) {
g_mem_thread_group->siged_flag = false;
return true;
}
return false;
}
static void mem_thread_cache_flush(struct mem_thread *mt);
static unsigned mem_thread_cache_count(const struct mem_thread *mt);
static void mem_thread_group_action_flush(int signum)
{
struct mem_thread *mt;
int stack_id;
if (g_mem_thread_group == NULL)
return;
g_mem_thread_group->siged_flag = true;
if (mem_thread_group_in_used(g_mem_thread_group, MEM_THREAD_MANAGER_FLUSH_MS))
return;
for (stack_id = 0; stack_id < PROTOCOL_STACK_MAX; stack_id++) {
mt = &g_mem_thread_group->mt_array[stack_id];
mem_thread_cache_flush(mt);
}
}
static int mem_thread_group_register_flush(void)
{
sighandler_t handler;
handler = signal(MEM_THREAD_FLUSH_SIG, mem_thread_group_action_flush);
if (handler == SIG_ERR) {
LSTACK_LOG(ERR, LSTACK, "signal failed\n");
return -1;
}
pthread_unblock_sig(MEM_THREAD_FLUSH_SIG);
return 0;
}
static inline void mem_thread_group_notify_flush(const struct mem_thread_group *mt_group, uint32_t timeout)
{
const struct mem_thread *mt;
int stack_id;
unsigned count = 0;
if (mem_thread_group_in_used(mt_group, timeout))
return;
for (stack_id = 0; stack_id < PROTOCOL_STACK_MAX; stack_id++) {
mt = &mt_group->mt_array[stack_id];
count += mem_thread_cache_count(mt);
}
if (count == 0) {
return;
}
if (pthread_kill(mt_group->thread, MEM_THREAD_FLUSH_SIG) != 0) {
LSTACK_LOG(ERR, LSTACK, "pthread_kill tid %d failed\n", mt_group->tid);
}
}
static inline bool mem_thread_group_exist(const struct mem_thread_group *mt_group)
{
if (access(mt_group->task_path, R_OK) != 0) {
if (errno == ENOENT) {
return false;
}
LSTACK_LOG(ERR, LSTACK, "mem_thread_group_exist access %s failed, errno %d\n",
mt_group->task_path, errno);
}
return true;
}
static void mem_thread_manager_add_work(struct mem_thread_group *mt_group)
{
rte_spinlock_lock(&g_mem_thread_manager.list_lock);
list_add_node(&mt_group->mt_node, &g_mem_thread_manager.mt_work_list);
g_mem_thread_manager.thread_num++;
rte_spinlock_unlock(&g_mem_thread_manager.list_lock);
}
static void mem_thread_group_free(struct mem_thread_group *mt_group)
{
struct mem_thread *mt;
int stack_id;
for (stack_id = 0; stack_id < PROTOCOL_STACK_MAX; stack_id++) {
mt = &mt_group->mt_array[stack_id];
mem_thread_cache_free(mt);
}
free(mt_group);
return;
}
static int mem_thread_group_init(int stack_id)
{
struct mem_thread *mt;
if (rte_lcore_id() < RTE_MAX_LCORE) {
LSTACK_LOG(ERR, LSTACK, "tid %d, lcore_id %u is invalid\n", rte_gettid(), rte_lcore_id());
return -1;
}
if (g_mem_thread_group == NULL) {
g_mem_thread_group = (struct mem_thread_group *)calloc(1, sizeof(struct mem_thread_group));
if (g_mem_thread_group == NULL) {
LSTACK_LOG(ERR, LSTACK, "alloc mem_thread_group failed, stack_id %d\n", stack_id);
return -1;
}
mem_thread_group_register_flush();
g_mem_thread_group->tid = rte_gettid();
g_mem_thread_group->thread = pthread_self();
SYS_FORMAT_NAME(g_mem_thread_group->task_path, sizeof(g_mem_thread_group->task_path),
MEM_THREAD_TASK_PATH, getpid(), g_mem_thread_group->tid);
list_init_node(&g_mem_thread_group->mt_node);
mem_thread_manager_add_work(g_mem_thread_group);
}
mt = &g_mem_thread_group->mt_array[stack_id];
if (mem_thread_cache_init(mt, stack_id) != 0) {
LSTACK_LOG(ERR, LSTACK, "mem_thread_cache_init failed, stack_id %d\n", stack_id);
return -1;
}
return 0;
}
static inline struct mem_thread *mem_thread_group_get(int stack_id)
{
struct mem_thread *mt;
if (likely(g_mem_thread_group != NULL)) {
mt = &g_mem_thread_group->mt_array[stack_id];
if (likely(mt->mbuf_cache != NULL))
return mt;
}
if (mem_thread_group_init(stack_id) != 0) {
LSTACK_LOG(ERR, LSTACK, "mem_thread_group_init failed, stack_id %d\n", stack_id);
return NULL;
}
mt = &g_mem_thread_group->mt_array[stack_id];
return mt;
}
static void mem_thread_manager_flush_all(void)
{
struct list_node *node, *next;
struct mem_thread_group *mt_group;
uint32_t now = sys_now();
rte_spinlock_lock(&g_mem_thread_manager.list_lock);
if (now - g_mem_thread_manager.flush_time < MEM_THREAD_MANAGER_FLUSH_MS) {
rte_spinlock_unlock(&g_mem_thread_manager.list_lock);
return;
}
g_mem_thread_manager.flush_time = now;
list_for_each_node(node, next, &g_mem_thread_manager.mt_work_list) {
mt_group = container_of(node, struct mem_thread_group, mt_node);
if (mt_group == g_mem_thread_group)
continue;
mem_thread_group_notify_flush(mt_group, MEM_THREAD_MANAGER_FLUSH_MS);
}
rte_spinlock_unlock(&g_mem_thread_manager.list_lock);
}
static void *mem_thread_manager_thread(void *arg)
{
struct list_node *node, *next;
struct mem_thread_group *mt_group;
unsigned count = 0;
rte_spinlock_init(&g_mem_thread_manager.list_lock);
list_init_head(&g_mem_thread_manager.mt_work_list);
list_init_head(&g_mem_thread_manager.mt_free_list);
g_mem_thread_manager.flush_time = sys_now();
while(true) {
sleep(MEM_THREAD_MANAGER_FREE_S);
rte_spinlock_lock(&g_mem_thread_manager.list_lock);
list_for_each_node(node, next, &g_mem_thread_manager.mt_free_list) {
mt_group = container_of(node, struct mem_thread_group, mt_node);
list_del_node(node);
mem_thread_group_free(mt_group);
}
list_for_each_node(node, next, &g_mem_thread_manager.mt_work_list) {
count++;
if (count > MEM_THREAD_MANAGER_FREE_MAX) {
* and start traversing from this node next time */
list_del_node(&g_mem_thread_manager.mt_work_list);
list_add_node(&g_mem_thread_manager.mt_work_list, node);
break;
}
mt_group = container_of(node, struct mem_thread_group, mt_node);
if (mem_thread_group_exist(mt_group)) {
mem_thread_group_notify_flush(mt_group, MEM_THREAD_MANAGER_FREE_S * MS_PER_S);
continue;
}
list_del_node(node);
list_add_node(node, &g_mem_thread_manager.mt_free_list);
g_mem_thread_manager.thread_num--;
}
rte_spinlock_unlock(&g_mem_thread_manager.list_lock);
}
return NULL;
}
int mem_thread_manager_init(void)
{
return thread_create("gzmempool", 0, mem_thread_manager_thread, NULL);
}
static inline struct mem_thread *mem_thread_get(int stack_id)
{
if (get_protocol_stack() != NULL)
return NULL;
#if MEMP_DEBUG
if (RTE_PER_LCORE(_lcore_id) < RTE_MAX_LCORE) {
LWIP_DEBUGF(MEMP_DEBUG | LWIPGZ_LOG_FATAL, ("tid %d has invalid rte_lcore_id %u !\n",
rte_gettid(), RTE_PER_LCORE(_lcore_id)));
return NULL;
}
#endif
return mem_thread_group_get(stack_id);
}
struct mem_obj_ops {
void (*init)(struct rte_mempool *mp, void *arg, void *obj, unsigned obj_idx);
unsigned (*get_stack_id)(const void *obj);
struct rte_mempool * (*get_pool)(const void *obj);
};
static __rte_always_inline
void rpc_obj_init(struct rte_mempool *mp, void *arg, void *obj, unsigned obj_idx)
{
int stack_id = *(int *)arg;
struct rpc_msg *msg = obj;
msg->stack_id = stack_id;
}
static __rte_always_inline
unsigned rpc_obj_get_stack_id(const void *obj)
{
return ((const struct rpc_msg *)obj)->stack_id;
}
static __rte_always_inline
struct rte_mempool *rpc_obj_get_pool(const void *obj)
{
int stack_id = rpc_obj_get_stack_id(obj);
return mem_get_rpc_pool(stack_id);
}
static __rte_always_inline
void mbuf_obj_init(struct rte_mempool *mp, void *arg, void *obj, unsigned obj_idx)
{
int stack_id = *(int *)arg;
struct rte_mbuf *mbuf = obj;
struct mbuf_private *priv = mbuf_to_private(mbuf);
priv->stack_id = stack_id;
}
static __rte_always_inline
unsigned mbuf_obj_get_stack_id(const void *obj)
{
return mbuf_to_private((const struct rte_mbuf *)obj)->stack_id;
}
static __rte_always_inline
struct rte_mempool *mbuf_obj_get_pool(const void *obj)
{
int stack_id = mbuf_obj_get_stack_id(obj);
return mem_get_mbuf_pool(stack_id);
}
static const struct mem_obj_ops rpc_obj_ops = {
.init = rpc_obj_init,
.get_stack_id = rpc_obj_get_stack_id,
.get_pool = rpc_obj_get_pool,
};
static const struct mem_obj_ops mbuf_obj_ops = {
.init = mbuf_obj_init,
.get_stack_id = mbuf_obj_get_stack_id,
.get_pool = mbuf_obj_get_pool,
};
struct mempool_ops {
struct rte_mempool *(*create)(const char *name, unsigned n,
unsigned cache_size, unsigned priv_size, unsigned data_room_size, int socket_id);
void (*put_bulk)(struct rte_mempool *pool, void *const *obj_table, unsigned n);
unsigned (*get_bulk)(struct rte_mempool *pool, void **obj_table, unsigned n);
};
static __rte_always_inline
struct rte_mempool *mempool_create(const char *name, unsigned n,
unsigned cache_size, unsigned priv_size, unsigned data_room_size, int socket_id)
{
struct rte_mempool *pool;
LSTACK_LOG(INFO, LSTACK, "name %s, n %u, cache_size %u, priv_size %u, data_room_size %u, socket_id %d, ops_name %s\n",
name, n, cache_size, priv_size, data_room_size, socket_id, MEMPOOL_OPS_NAME);
pool = rte_mempool_create(name, n, data_room_size, cache_size, priv_size, NULL, NULL, NULL, NULL, socket_id, 0);
if (pool != NULL)
rte_mempool_set_ops_byname(pool, MEMPOOL_OPS_NAME, NULL);
return pool;
}
static __rte_always_inline
void mempool_put_bulk(struct rte_mempool *pool, void *const *obj_table, unsigned n)
{
rte_mempool_put_bulk(pool, obj_table, n);
}
static __rte_always_inline
unsigned mempool_get_bulk(struct rte_mempool *pool, void **obj_table, unsigned n)
{
return rte_mempool_get_bulk(pool, obj_table, n) != 0 ? 0 : n;
}
static __rte_always_inline
struct rte_mempool *pkgmbuf_create(const char *name, unsigned n,
unsigned cache_size, unsigned priv_size, unsigned data_room_size, int socket_id)
{
LSTACK_LOG(INFO, LSTACK, "name %s, n %u, cache_size %u, priv_size %u, data_room_size %u, socket_id %d, ops_name %s\n",
name, n, cache_size, priv_size, data_room_size, socket_id, MEMPOOL_OPS_NAME);
return rte_pktmbuf_pool_create_by_ops(name, n, cache_size, priv_size, data_room_size, socket_id, MEMPOOL_OPS_NAME);
}
static __rte_always_inline
void pkgmbuf_put_bulk(struct rte_mempool *pool, void *const *obj_table, unsigned n)
{
rte_mempool_put_bulk(pool, obj_table, n);
}
static __rte_always_inline
unsigned pkgmbuf_get_bulk(struct rte_mempool *pool, void **obj_table, unsigned n)
{
return rte_pktmbuf_alloc_bulk(pool, (struct rte_mbuf **)obj_table, n) != 0 ? 0 : n;
}
static const struct mempool_ops mem_mp_ops = {
.create = mempool_create,
.put_bulk = mempool_put_bulk,
.get_bulk = mempool_get_bulk,
};
static const struct mempool_ops mbuf_mp_ops = {
.create = pkgmbuf_create,
.put_bulk = pkgmbuf_put_bulk,
.get_bulk = pkgmbuf_get_bulk,
};
static struct rte_mempool *mbuf_pool_create(int stack_id, unsigned numa_id)
{
char name[RTE_MEMPOOL_NAMESIZE];
struct rte_mempool *pool;
uint32_t total_bufs;
uint16_t private_size;
uint16_t xdp_metadata = 0;
total_bufs = dpdk_pktmbuf_mempool_num();
if (total_bufs > MEMPOOL_MAX_NUM) {
LSTACK_LOG(ERR, LSTACK, "total_bufs %u out of the dpdk mempool range\n", total_bufs);
return NULL;
}
SYS_FORMAT_NAME(name, RTE_MEMPOOL_NAMESIZE, "%s_%hu", "mbuf_pool", stack_id);
if (xdp_eth_enabled()) {
xdp_metadata = 24;
}
private_size = RTE_ALIGN(sizeof(struct mbuf_private) + xdp_metadata, RTE_CACHE_LINE_SIZE);
pool = mbuf_mp_ops.create(name, total_bufs, MBUFPOOL_CACHE_NUM, private_size, MBUF_DATA_SIZE, numa_id);
if (pool == NULL) {
LSTACK_LOG(ERR, LSTACK, "rte_pktmbuf_pool_create %s failed, rte_errno %d\n", name, rte_errno);
return NULL;
}
return pool;
}
static struct rte_mempool *rpc_pool_create(int stack_id, unsigned numa_id)
{
char name [RTE_MEMPOOL_NAMESIZE];
struct rte_mempool *pool;
uint32_t total_bufs;
total_bufs = RPCPOLL_MAX_NUM;
if (total_bufs > MEMPOOL_MAX_NUM) {
LSTACK_LOG(ERR, LSTACK, "total_bufs %u out of the dpdk mempool range\n", total_bufs);
return NULL;
}
SYS_FORMAT_NAME(name, RTE_MEMPOOL_NAMESIZE, "%s_%hu", "rpc_pool", stack_id);
pool = mem_mp_ops.create(name, total_bufs, MEMPOOL_CACHE_NUM, 0, sizeof(struct rpc_msg), numa_id);
if (pool == NULL) {
LSTACK_LOG(ERR, LSTACK, "rte_mempool_create %s failed, rte_errno %d\n", name, rte_errno);
}
return pool;
}
void mem_stack_pool_free(int stack_id)
{
struct mem_stack *ms = mem_stack_get(stack_id);
if (ms->mbuf_pool != NULL) {
rte_mempool_free(ms->mbuf_pool);
ms->mbuf_pool = NULL;
}
if (ms->rpc_pool != NULL) {
rte_mempool_free(ms->rpc_pool);
ms->rpc_pool = NULL;
}
}
int mem_stack_pool_init(int stack_id, unsigned numa_id)
{
struct mem_stack *ms = mem_stack_get(stack_id);
ms->mbuf_pool = mbuf_pool_create(stack_id, numa_id);
if (ms->mbuf_pool == NULL) {
return -1;
}
ms->rpc_pool = rpc_pool_create(stack_id, numa_id);
if (ms->rpc_pool == NULL) {
mem_stack_pool_free(stack_id);
return -1;
}
rte_mempool_obj_iter(ms->mbuf_pool, mbuf_obj_ops.init, &stack_id);
rte_mempool_obj_iter(ms->rpc_pool, rpc_obj_ops.init, &stack_id);
return 0;
}
int mem_stack_mpcache_init(int stack_id, unsigned cpu_id)
{
struct mem_stack *ms = mem_stack_get(stack_id);
if (ms->mbuf_pool == NULL) {
LSTACK_LOG(ERR, LSTACK, "mem_stack_get stack_id %d failed\n", stack_id);
return -1;
}
RTE_PER_LCORE(_lcore_id) = cpu_id;
ms->mbuf_mpcache = rte_mempool_default_cache(ms->mbuf_pool, rte_lcore_id());
ms->migrate_watermark = ms->mbuf_mpcache->size / 8;
LSTACK_LOG(INFO, LSTACK, "tid %d, stack_id %d, lcore_id %u, migrate_watermark %u\n",
rte_gettid(), stack_id, rte_lcore_id(), ms->migrate_watermark);
return 0;
}
static void mem_thread_cache_flush(struct mem_thread *mt)
{
struct mem_stack *ms = mem_stack_get(mt->stack_id);
void *obj_table[BUF_BULK_MAX_NUM];
unsigned num;
if (mt->mbuf_migrate_ring != NULL) {
LWIP_DEBUGF(MEMP_DEBUG, ("%s(mem_thread=%p, stack_id=%d, mbuf_migrate_ring count=%u)\n",
__FUNCTION__, mt, mt->stack_id, rte_ring_count(mt->mbuf_migrate_ring)));
while (true) {
num = rte_ring_sc_dequeue_burst(mt->mbuf_migrate_ring, obj_table, BUF_BULK_MAX_NUM, NULL);
if (num == 0)
break;
mbuf_mp_ops.put_bulk(ms->mbuf_pool, obj_table, num);
}
}
if (mt->mbuf_cache != NULL) {
LWIP_DEBUGF(MEMP_DEBUG, ("%s(mem_thread=%p, stack_id=%d, mbuf_cache count=%u)\n",
__FUNCTION__, mt, mt->stack_id, buf_cache_count(mt->mbuf_cache)));
while (true) {
num = LWIP_MIN(buf_cache_count(mt->mbuf_cache), BUF_BULK_MAX_NUM);
num = buf_cache_pop_bulk(mt->mbuf_cache, obj_table, num, NULL);
if (num == 0)
break;
mbuf_mp_ops.put_bulk(ms->mbuf_pool, obj_table, num);
}
buf_cache_reset_watermark(mt->mbuf_cache);
}
if (mt->rpc_cache != NULL) {
LWIP_DEBUGF(MEMP_DEBUG, ("%s(mem_thread=%p, stack_id=%d, rpc_cache count=%u)\n",
__FUNCTION__, mt, mt->stack_id, buf_cache_count(mt->rpc_cache)));
while (true) {
num = LWIP_MIN(buf_cache_count(mt->rpc_cache), BUF_BULK_MAX_NUM);
num = buf_cache_pop_bulk(mt->rpc_cache, obj_table, num, NULL);
if (num == 0)
break;
mem_mp_ops.put_bulk(ms->rpc_pool, obj_table, num);
}
buf_cache_reset_watermark(mt->rpc_cache);
}
}
static unsigned mem_thread_cache_count(const struct mem_thread *mt)
{
unsigned count = 0;
if (mt->mbuf_migrate_ring != NULL) {
count += rte_ring_count(mt->mbuf_migrate_ring);
}
if (mt->mbuf_cache != NULL) {
count += buf_cache_count(mt->mbuf_cache);
}
if (mt->rpc_cache != NULL) {
count += buf_cache_count(mt->rpc_cache);
}
return count;
}
void mem_thread_cache_free(struct mem_thread *mt)
{
mem_thread_cache_flush(mt);
if (mt->mbuf_migrate_ring != NULL) {
rte_ring_free(mt->mbuf_migrate_ring);
mt->mbuf_migrate_ring = NULL;
}
if (mt->mbuf_cache != NULL) {
buf_cache_free(mt->mbuf_cache);
mt->mbuf_cache = NULL;
}
if (mt->rpc_cache != NULL) {
buf_cache_free(mt->rpc_cache);
mt->rpc_cache = NULL;
}
}
int mem_thread_cache_init(struct mem_thread *mt, int stack_id)
{
mt->stack_id = stack_id;
if (get_global_cfg_params()->mem_async_mode) {
char name [RTE_MEMPOOL_NAMESIZE];
SYS_FORMAT_NAME(name, RTE_MEMPOOL_NAMESIZE, "%s_%p", "migrate_ring", mt);
mt->mbuf_migrate_ring = rte_ring_create(name,
LWIP_MAX(get_global_cfg_params()->mem_cache_num, MIGRATE_RING_MIN_NUM),
rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
if (mt->mbuf_migrate_ring == NULL) {
return -1;
}
}
mt->mbuf_cache = buf_cache_create(get_global_cfg_params()->mem_cache_num);
if (mt->mbuf_cache == NULL) {
mem_thread_cache_free(mt);
return -1;
}
mt->rpc_cache = buf_cache_create(BUF_CACHE_MIN_NUM);
if (mt->rpc_cache == NULL) {
mem_thread_cache_free(mt);
return -1;
}
return 0;
}
struct mem_thread *mem_thread_migrate_get(int stack_id)
{
struct mem_thread *mt = mem_thread_get(stack_id);
if (mt == NULL || mt->mbuf_migrate_ring == NULL)
return NULL;
return mt;
}
static inline void mem_preinit_pbuf(struct pbuf *p);
void mem_mbuf_migrate_enqueue(struct mem_thread *mt, unsigned n)
{
struct mem_stack *ms;
struct rte_mempool_cache *mpcache;
int stack_id;
unsigned num, i;
void **obj_table;
stack_id = get_protocol_stack()->stack_idx;
ms = mem_stack_get(stack_id);
mpcache = ms->mbuf_mpcache;
mt->stk_migrate_count += n;
if (mt->stk_migrate_count < BUF_CACHE_WATERSTEP_MIN)
return;
if (mpcache->len < ms->migrate_watermark)
return;
if (mem_stack_pool_ring_count(ms->mbuf_pool) < MBUFPOOL_RESERVE_NUM) {
mem_thread_manager_flush_all();
mt->stk_migrate_count = 0;
return;
}
num = LWIP_MIN(mpcache->len - ms->migrate_watermark,
mt->stk_migrate_count);
obj_table = &mpcache->objs[mpcache->len - num];
for (i = 0; i < num; i++) {
rte_pktmbuf_reset(obj_table[i]);
mem_preinit_pbuf(mbuf_to_pbuf(obj_table[i]));
}
num = rte_ring_sp_enqueue_bulk(mt->mbuf_migrate_ring, obj_table, num, NULL);
if (num > 0) {
mpcache->len -= num;
mt->stk_migrate_count -= num;
} else {
mt->stk_migrate_count = 0;
}
}
void mem_mbuf_migrate_dequeue(struct mem_thread *mt)
{
struct buf_cache *cache;
unsigned num;
void **obj_table;
if (mt->mbuf_migrate_ring == NULL)
return;
cache = mt->mbuf_cache;
if (cache->head > (cache->watermark >> 1))
return;
num = cache->capacity - cache->head;
obj_table = &cache->objs[cache->head];
num = rte_ring_sc_dequeue_burst(mt->mbuf_migrate_ring, obj_table, num, NULL);
cache->head += num;
}
static inline
void pool_put_with_mpcache(struct rte_mempool *pool, struct rte_mempool_cache* mpcache, void *obj)
{
if (mpcache->len >= mpcache->flushthresh) {
rte_mempool_ops_enqueue_bulk(pool, &mpcache->objs[mpcache->size],
mpcache->len - mpcache->size);
mpcache->len = mpcache->size;
}
mpcache->objs[mpcache->len] = obj;
mpcache->len++;
}
static inline
void pool_put_with_bufcache(struct rte_mempool *pool, struct buf_cache* cache, void *obj)
{
if (cache->head >= cache->flushthresh) {
buf_cache_sub_watermark(cache);
rte_mempool_ops_enqueue_bulk(pool, &cache->objs[cache->watermark],
cache->head - cache->watermark);
cache->head = cache->watermark;
}
cache->objs[cache->head] = obj;
cache->head++;
}
static unsigned pool_get_bulk_with_cache(const struct mempool_ops *pool_ops,
struct rte_mempool *pool, struct buf_cache *cache,
void **obj_table, unsigned n, unsigned pool_count)
{
unsigned ret;
unsigned count = 0;
unsigned get_count;
ret = buf_cache_pop_bulk(cache, obj_table, n, &count);
if (ret > 0) {
return n;
}
ret = pool_ops->get_bulk(pool, obj_table, n);
if (unlikely(ret == 0)) {
LSTACK_LOG(ERR, LSTACK, "pool %s get_bulk failed, n %u, count %u\n",
pool->name, n, mem_stack_pool_ring_count(pool));
return 0;
}
ret = MBUFPOOL_RESERVE_NUM + BUF_CACHE_MIN_NUM * g_mem_thread_manager.thread_num;
if (unlikely(ret > pool_count)) {
buf_cache_reset_watermark(cache);
return n;
}
buf_cache_add_watermark(cache);
if (count >= cache->watermark) {
return n;
}
get_count = cache->watermark - count;
LWIP_DEBUGF(MEMP_DEBUG, ("%s(cache=%p, watermark=%u, get_count=%u)\n",
__FUNCTION__, cache, cache->watermark, get_count));
ret = pool_ops->get_bulk(pool, &cache->objs[cache->head], get_count);
if (unlikely(ret == 0)) {
LSTACK_LOG(ERR, LSTACK, "pool %s get_bulk failed, n %u, count %u\n",
pool->name, get_count, mem_stack_pool_ring_count(pool));
} else {
cache->head += get_count;
}
return n;
}
static void pool_put_bulk_with_cache(const struct mempool_ops *pool_ops,
struct rte_mempool *pool, struct buf_cache *cache,
void *const *obj_table, unsigned n)
{
unsigned ret;
unsigned count;
unsigned free_count = 0;
unsigned put_count;
ret = buf_cache_push_bulk(cache, obj_table, n, &free_count);
if (ret > 0) {
return;
}
pool_ops->put_bulk(pool, obj_table, n);
buf_cache_sub_watermark(cache);
count = buf_cache_get_capacity(cache) - free_count;
if (count <= cache->watermark) {
return;
}
put_count = count - cache->watermark;
LWIP_DEBUGF(MEMP_DEBUG, ("%s(cache=%p, watermark=%u, put_count=%u)\n",
__FUNCTION__, cache, cache->watermark, put_count));
pool_ops->put_bulk(pool, &cache->objs[cache->head - put_count], put_count);
cache->head -= put_count;
return;
}
void *mem_get_rpc(int stack_id, bool reserve)
{
struct mem_stack *ms = mem_stack_get(stack_id);
struct mem_thread *mt = mem_thread_get(stack_id);
unsigned ret = 0;
unsigned pool_count;
void *obj;
pool_count = mem_stack_pool_ring_count(ms->rpc_pool);
if (reserve && pool_count < RPCPOOL_RESERVE_NUM) {
goto out;
}
if (mt == NULL) {
ret = mem_mp_ops.get_bulk(ms->rpc_pool, &obj, 1);
} else {
mem_thread_group_used();
ret = pool_get_bulk_with_cache(&mem_mp_ops, ms->rpc_pool, mt->rpc_cache,
&obj, 1, pool_count);
mem_thread_group_done();
}
LWIP_DEBUGF(MEMP_DEBUG, ("%s(stack_id=%d, obj=%p)\n", __FUNCTION__, stack_id, obj));
out:
if (unlikely(ret == 0)) {
mem_thread_manager_flush_all();
return NULL;
}
return obj;
}
void mem_put_rpc(void *obj)
{
unsigned stack_id = rpc_obj_ops.get_stack_id(obj);
struct mem_stack *ms = mem_stack_get(stack_id);
struct mem_thread *mt = mem_thread_get(stack_id);
LWIP_DEBUGF(MEMP_DEBUG, ("%s(stack_id=%d, obj=%p)\n", __FUNCTION__, stack_id, obj));
if (mt == NULL) {
mem_mp_ops.put_bulk(ms->rpc_pool, &obj, 1);
} else {
mem_thread_group_used();
pool_put_bulk_with_cache(&mem_mp_ops, ms->rpc_pool, mt->rpc_cache, &obj, 1);
mem_thread_group_done();
}
}
unsigned mem_get_mbuf_bulk(int stack_id, struct rte_mbuf **mbuf_table, unsigned n, bool reserve)
{
struct mem_stack *ms = mem_stack_get(stack_id);
struct mem_thread *mt = mem_thread_get(stack_id);
unsigned ret = 0;
unsigned pool_count;
if (unlikely(n == 0)) {
return 0;
}
pool_count = mem_stack_pool_ring_count(ms->mbuf_pool);
if (reserve && pool_count < MBUFPOOL_RESERVE_NUM + n) {
goto out;
}
if (mt == NULL) {
ret = mbuf_mp_ops.get_bulk(ms->mbuf_pool, (void **)mbuf_table, n);
} else {
mem_thread_group_used();
mem_mbuf_migrate_dequeue(mt);
ret = pool_get_bulk_with_cache(&mbuf_mp_ops, ms->mbuf_pool, mt->mbuf_cache,
(void **)mbuf_table, n, pool_count);
mem_thread_group_done();
}
#if MEMP_DEBUG
for (unsigned i = 0; i < ret; ++i) {
LWIP_DEBUGF(MEMP_DEBUG, ("%s(stack_id=%d, n=%u, mbuf_table[%u]=%p, pbuf=%p)\n",
__FUNCTION__, stack_id, n, i, mbuf_table[i], mbuf_to_pbuf(mbuf_table[i])));
}
#endif
out:
if (unlikely(ret == 0)) {
mem_thread_manager_flush_all();
}
return ret;
}
static void mem_put_mbuf_bulk_by_pbuf(struct rte_mbuf *const *mbuf_table, unsigned n)
{
unsigned stack_id = mbuf_obj_ops.get_stack_id(mbuf_table[0]);
struct mem_stack *ms = mem_stack_get(stack_id);
struct mem_thread *mt = mem_thread_get(stack_id);
if (unlikely(n == 0)) {
return;
}
#if MEMP_DEBUG
for (unsigned i = 0; i < n; ++i) {
LWIP_DEBUGF(MEMP_DEBUG, ("%s(stack_id=%d, n=%u, mbuf_table[%u]=%p, pbuf=%p)\n",
__FUNCTION__, stack_id, n, i, mbuf_table[i], mbuf_to_pbuf(mbuf_table[i])));
}
#endif
if (mt == NULL) {
mbuf_mp_ops.put_bulk(ms->mbuf_pool, (void *const *)mbuf_table, n);
} else {
mem_thread_group_used();
pool_put_bulk_with_cache(&mbuf_mp_ops, ms->mbuf_pool, mt->mbuf_cache, (void *const *)mbuf_table, n);
mem_thread_group_done();
}
}
void mem_put_mbuf_bulk(struct rte_mbuf *const *mbuf_table, unsigned n)
{
unsigned i;
for (i = 0; i < n; ++i) {
LWIP_DEBUGF(MEMP_DEBUG, ("%s(stack_id=%d, n=%u, mbuf_table[%u]=%p, pbuf=%p)\n",
__FUNCTION__, mbuf_obj_ops.get_stack_id(mbuf_table[i]),
n, i, mbuf_table[i], mbuf_to_pbuf(mbuf_table[i])));
rte_pktmbuf_free(mbuf_table[i]);
}
}
unsigned mem_get_pbuf_bulk(int stack_id, struct pbuf **pbuf_table, unsigned n, bool reserve)
{
struct rte_mbuf **mbuf_table = (struct rte_mbuf **)pbuf_table;
unsigned ret, i;
ret = mem_get_mbuf_bulk(stack_id, mbuf_table, n, reserve);
if (unlikely(ret == 0)) {
struct protocol_stack *stack = get_protocol_stack_by_id(stack_id);
stack->stats.tx_allocmbuf_fail++;
return 0;
}
for (i = 0; i < (n & ~0x3); i += 4) {
pbuf_table[i] = mbuf_to_pbuf(mbuf_table[i]);
pbuf_table[i + 1] = mbuf_to_pbuf(mbuf_table[i + 1]);
pbuf_table[i + 2] = mbuf_to_pbuf(mbuf_table[i + 2]);
pbuf_table[i + 3] = mbuf_to_pbuf(mbuf_table[i + 3]);
}
switch (n & 0x3) {
case 3:
pbuf_table[i] = mbuf_to_pbuf(mbuf_table[i]);
++i;
case 2:
pbuf_table[i] = mbuf_to_pbuf(mbuf_table[i]);
++i;
case 1:
pbuf_table[i] = mbuf_to_pbuf(mbuf_table[i]);
++i;
}
return n;
}
void mem_preput_pbuf(struct pbuf *p)
{
struct rte_mbuf *m = pbuf_to_mbuf(p);
p->mbuf_refcnt = rte_mbuf_refcnt_read(m);
if (p->mbuf_refcnt == 1) {
rte_pktmbuf_reset(m);
}
}
static __rte_always_inline
struct rte_mbuf *pbuf_to_mbuf_prefree(struct pbuf *p)
{
if (unlikely(p == NULL))
return NULL;
if (p->next != NULL)
p->next = NULL;
struct rte_mbuf *m = pbuf_to_mbuf(p);
#if MEMP_DEBUG
if (rte_mbuf_refcnt_read(m) > 1) {
LWIP_DEBUGF(MEMP_DEBUG, ("%s(mbuf=%p, pbuf=%p, refcnt=%u)\n",
__FUNCTION__, m, p, rte_mbuf_refcnt_read(m)));
}
#endif
if (p->mbuf_refcnt != 1) {
m = rte_pktmbuf_prefree_seg(m);
if (m != NULL) {
rte_pktmbuf_reset(m);
}
}
return m;
}
void mem_put_pbuf_bulk(struct pbuf *const *pbuf_table, unsigned n)
{
struct rte_mbuf *mbuf_table[BUF_BULK_MAX_NUM];
unsigned i, copied, batch, bulk_num;
copied = 0;
while (copied < n) {
batch = LWIP_MIN(n - copied, BUF_BULK_MAX_NUM);
bulk_num = 0;
for (i = 0; i < batch; ++i, ++copied) {
mbuf_table[bulk_num] = pbuf_to_mbuf_prefree(pbuf_table[copied]);
if (mbuf_table[bulk_num] != NULL) {
++bulk_num;
}
}
mem_put_mbuf_bulk_by_pbuf(mbuf_table, bulk_num);
}
}
void mem_put_pbuf_list_bulk(struct pbuf *const *pbuf_table, unsigned n)
{
unsigned stack_id = mbuf_obj_ops.get_stack_id(pbuf_to_mbuf(pbuf_table[0]));
struct mem_stack *ms = mem_stack_get(stack_id);
struct mem_thread *mt = mem_thread_get(stack_id);
struct pbuf *q, *next;
struct rte_mbuf *mbuf;
if (mt != NULL)
mem_thread_group_used();
for (unsigned i = 0; i < n; ++i) {
q = pbuf_table[i];
while (q != NULL) {
next = q->next;
q->next = NULL;
q->ref--;
if (q->ref > 0)
break;
mbuf = pbuf_to_mbuf_prefree(q);
if (mbuf == NULL)
break;
q = next;
if (mt == NULL) {
pool_put_with_mpcache(ms->mbuf_pool, ms->mbuf_mpcache, mbuf);
} else {
pool_put_with_bufcache(ms->mbuf_pool, mt->mbuf_cache, mbuf);
}
LWIP_DEBUGF(MEMP_DEBUG, ("%s(stack_id=%d, n=%u, mbuf_table[%u]=%p, pbuf=%p)\n",
__FUNCTION__, stack_id, n, i, mbuf, q));
}
}
if (mt != NULL)
mem_thread_group_done();
return;
}
struct pbuf *mem_get_pbuf(int stack_id, bool reserve)
{
int ret;
struct rte_mbuf *mbuf;
if (stack_id < 0 || stack_id >= PROTOCOL_STACK_MAX)
stack_id = get_protocol_stack()->stack_idx;
ret = mem_get_mbuf_bulk(stack_id, &mbuf, 1, reserve);
if (unlikely(ret == 0)) {
struct protocol_stack *stack = get_protocol_stack_by_id(stack_id);
stack->stats.tx_allocmbuf_fail++;
return NULL;
}
return mbuf_to_pbuf(mbuf);
}
void mem_put_pbuf(struct pbuf *p)
{
struct rte_mbuf *mbuf = pbuf_to_mbuf_prefree(p);
if (mbuf != NULL) {
mem_put_mbuf_bulk_by_pbuf(&mbuf, 1);
}
}
unsigned mem_extcache_get_pbuf_bulk(int stack_id, struct pbuf **pbuf_table, unsigned n, bool reserve, struct pbuf **extcache_list)
{
unsigned ret;
struct pbuf *p;
for (int i = 0; i < n; ++i) {
p = *extcache_list;
if (p != NULL) {
*extcache_list = p->next;
p->next = NULL;
pbuf_table[i] = p;
} else {
ret = mem_get_pbuf_bulk(stack_id, &pbuf_table[i], n - i, reserve);
if (unlikely(ret == 0)) {
mem_put_pbuf_bulk(pbuf_table, i);
return 0;
}
break;
}
}
return n;
}
struct pbuf *mem_extcache_get_pbuf(int stack_id, bool reserve, struct pbuf **extcache_list)
{
struct pbuf *p;
p = *extcache_list;
if (p != NULL) {
*extcache_list = p->next;
p->next = NULL;
} else {
p = mem_get_pbuf(stack_id, reserve);
}
return p;
}
void mem_extcache_put_pbuf(struct pbuf *h, struct pbuf *t, struct pbuf **extcache_list)
{
if (get_global_cfg_params()->stack_mode_rtc) {
pbuf_free(h);
return;
}
if (*extcache_list == NULL) {
*extcache_list = h;
} else {
if (t == NULL)
t = pbuf_list_tail(h);
t->next = *extcache_list;
*extcache_list = h;
}
}
void mem_extcache_flush_pbuf(struct pbuf **extcache_list)
{
if (get_global_cfg_params()->stack_mode_rtc) {
return;
}
struct pbuf *p = *extcache_list;
if (p != NULL) {
mem_put_pbuf_list_bulk(&p, 1);
*extcache_list = NULL;
}
}
static inline void mem_preinit_pbuf(struct pbuf *p)
{
mem_init_pbuf(p, 0, 0, 0, PBUF_POOL_PREINIT);
}
void mem_init_pbuf(struct pbuf *p, pbuf_layer layer, uint16_t tot_len, uint16_t len, pbuf_type type)
{
struct pbuf_custom *pc;
struct rte_mbuf *mbuf;
void *data;
* so ignore PBUF_POOL_PREINIT at this time. */
if (layer == PBUF_TRANSPORT && p->type_internal == PBUF_POOL_PREINIT) {
p->payload = (uint8_t *)p->payload + LWIP_MEM_ALIGN_SIZE((uint16_t)layer);
p->type_internal = type;
p->len = len;
p->tot_len = tot_len;
return;
}
pc = (struct pbuf_custom *)p;
mbuf = pbuf_to_mbuf(p);
data = rte_pktmbuf_mtod(mbuf, void *);
pbuf_alloced_custom(layer, len, type, pc, data, MBUF_PAYLOAD_SIZE);
p->tot_len = tot_len;
pc->custom_free_function = mem_put_pbuf;
}