* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include <sys/types.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <stdbool.h>
#include <pthread.h>
#include "atomic_lock.h"
#include "atomic_ref.h"
#include "securec.h"
#include "ascend_hal.h"
#include "drv_buff_common.h"
#include "drv_buff_unibuff.h"
#include "drv_buff_mbuf.h"
#include "esched_user_interface.h"
#include "pbl_uda_user.h"
#include "queue_user_manage.h"
#include "queue_interface.h"
#include "queue.h"
#define MAX_STR_LEN 128
#define LOG_TIME_INTERVAL 100
#define MAX_ENQUE_TIME (38 * 100000)
#define QUEUE_DEPT_RES_SIZE 2
#define MAX_DEQUEUE_TRY_TIMES 3
#define ENQUE_EVENT_TIMEOUT (38 * 1000000)
typedef struct enque_timestamps {
uint64 total_start_timestamp;
uint64 overwrite_start_timstamp;
uint64 overwrite_end_timestamp;
uint64 submit_start_timestamp;
uint64 kern_start_timestamp;
uint64 kern_submit_timstamp;
uint64 kern_end_timestamp;
uint64 submit_end_timestamp;
uint64 total_end_timestamp;
} enque_timestamps;
THREAD pid_t g_cur_pid = 0;
enum queue_delay_action {
QUEUE_DELAY_ENTRY,
QUEUE_DELAY_OUT,
QUEUE_DELAY_ACTION_MAX
};
struct queue_delay_trace {
unsigned int time_threshold;
unsigned int cost_time;
unsigned int start_time;
unsigned int pid;
unsigned int rsv[4];
};
static inline unsigned int queue_get_tail(struct queue_manages *que_mng, unsigned int qid)
{
unsigned int depth = queue_get_local_depth(qid);
unsigned int tail = atomic_value_get(&que_mng->tail_status);
return tail % depth;
}
static inline bool queue_enque_set_tail(struct queue_manages *que_mng, unsigned int depth, unsigned int qid)
{
(void)qid;
if (depth == 0) {
return false;
}
unsigned int tail = (queue_get_orig_tail(que_mng) + 1) % depth;
return atomic_value_set(&que_mng->tail_status, tail);
}
static inline unsigned int get_queue_size(unsigned int head, unsigned int tail, unsigned int depth)
{
return (head <= tail) ? (tail - head) : (tail + depth - head);
}
static inline unsigned long long queue_get_head_value(struct queue_manages *que_manage)
{
return (que_manage->queue_head.head_value);
}
static inline bool is_queue_full(struct queue_manages *que_manage, unsigned int qid)
{
unsigned int depth = queue_get_local_depth(qid);
if ((depth < MIN_VALID_QUEUE_DEPTH) || (depth > MAX_QUEUE_DEPTH)) {
return true;
}
return (que_manage->queue_head.head_info.head == ((queue_get_orig_tail(que_manage) + 1) % depth)) ? true : false;
}
static inline unsigned int get_que_size_by_mng(struct queue_manages *que_manage, unsigned int qid)
{
unsigned int head, tail, size;
tail = queue_get_orig_tail(que_manage);
head = que_manage->queue_head.head_info.head;
size = get_queue_size(head, tail, queue_get_local_depth(qid));
return size % MAX_QUEUE_DEPTH;
}
static bool queue_device_invalid(unsigned int dev_id)
{
return (dev_id >= MAX_DEVICE);
}
static inline bool is_queue_empty(struct queue_manages *que_manage)
{
return (que_manage->queue_head.head_info.head == queue_get_orig_tail(que_manage)) ? true : false;
}
static inline unsigned long long queue_head_inc(union atomic_queue_head cur_head_value, unsigned int depth)
{
union atomic_queue_head new_head_value;
unsigned int new_head = cur_head_value.head_info.head + 1;
new_head_value.head_info.head = ((new_head >= depth) ? (0) : (new_head));
new_head_value.head_info.index = cur_head_value.head_info.index + 1;
return new_head_value.head_value;
}
static drvError_t get_queue_manage_by_qid(unsigned int devid, unsigned int qid, struct queue_manages **queue_manage)
{
struct queue_manages *local_manage = NULL;
if (queue_device_invalid(devid)) {
QUEUE_LOG_ERR("para is error. (dev_id=%u)\n", devid);
return DRV_ERROR_INVALID_DEVICE;
}
if (qid >= MAX_SURPORT_QUEUE_NUM) {
QUEUE_LOG_ERR("para is error. (qid=%u)\n", qid);
return DRV_ERROR_INVALID_VALUE;
}
local_manage = queue_get_local_mng(qid);
if (local_manage == NULL) {
QUEUE_LOG_WARN("Queue local manage is null. (qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
*queue_manage = local_manage;
return DRV_ERROR_NONE;
}
drvError_t queue_init_local(unsigned int dev_id)
{
static pthread_mutex_t g_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
static THREAD int g_queue_init_status[MAX_DEVICE] = {0};
static THREAD bool flag = false;
drvError_t ret;
(void)pthread_mutex_lock(&g_queue_mutex);
if (g_queue_init_status[dev_id] == QUEUE_INITED) {
(void)pthread_mutex_unlock(&g_queue_mutex);
return DRV_ERROR_REPEATED_INIT;
}
if (flag == false) {
ret = queue_dc_init();
if (ret != 0) {
(void)pthread_mutex_unlock(&g_queue_mutex);
return ret;
}
g_cur_pid = GETPID();
flag = true;
}
g_queue_init_status[dev_id] = QUEUE_INITED;
(void)pthread_mutex_unlock(&g_queue_mutex);
QUEUE_RUN_LOG_INFO("init queue success. (dev_id=%u)\n", dev_id);
return DRV_ERROR_NONE;
}
static drvError_t queue_create_para_check(unsigned int dev_id, const QueueAttr *que_attr, unsigned int *qid)
{
(void)qid;
unsigned long len;
(void)dev_id;
if ((que_attr->depth < MIN_VALID_QUEUE_DEPTH) || (que_attr->depth > MAX_QUEUE_DEPTH) ||
(que_attr->workMode > QUEUE_MODE_PULL)) {
QUEUE_LOG_ERR("Input para error. (depth=%u, work_mode=%u, flowctrl_flag=%d, drop_time=%ums).\n",
que_attr->depth, que_attr->workMode, que_attr->flowCtrlFlag, que_attr->flowCtrlDropTime);
return DRV_ERROR_INVALID_VALUE;
}
len = strnlen(que_attr->name, MAX_STR_LEN);
if (len >= MAX_STR_LEN) {
QUEUE_LOG_ERR("Input para error. (name_len=%lu)\n", len);
return DRV_ERROR_INVALID_VALUE;
}
return DRV_ERROR_NONE;
}
static drvError_t queue_init_mng_info(struct queue_manages *que_mng, const QueueAttr *que_attr,
unsigned int qid, unsigned int dev_id)
{
int ret;
struct timeval create_time;
init_atomic_lock(&que_mng->merge_atomic_lock);
ret = strncpy_s(que_mng->name, MAX_STR_LEN, que_attr->name, strnlen(que_attr->name, MAX_STR_LEN));
if (ret != 0) {
QUEUE_LOG_ERR("memcpy queue name failed. (qid=%u; name=%s; ret=%d)\n", qid, que_attr->name, ret);
return DRV_ERROR_INVALID_VALUE;
}
que_mng->dev_id = dev_id;
que_mng->id = qid;
que_mng->queue_head.head_value = 0;
que_get_time(&create_time);
que_mng->create_time = (unsigned long)(create_time.tv_usec + create_time.tv_sec * USEC_PER_SEC);
que_mng->creator_pid = getpid();
atomic_value_init(&que_mng->tail_status);
que_mng->enque_cas = 0;
que_mng->deque_cas = 0;
que_mng->fctl_flag = que_attr->flowCtrlFlag;
que_mng->drop_time = que_attr->flowCtrlDropTime;
que_mng->over_write = que_attr->overWriteFlag;
que_mng->work_mode = (int)((que_attr->workMode != (unsigned int)QUEUE_MODE_PULL) ? QUEUE_MODE_PUSH : QUEUE_MODE_PULL);
que_mng->merge_idx = INVALID_QUEUE_MERGE_IDX;
que_mng->remote_devid = QUEUE_INVALID_VALUE;
que_mng->remote_qid = QUEUE_INVALID_VALUE;
que_mng->remote_devpid = QUE_INTER_DEV_INVALID_VALUE;
que_mng->remote_grpid = QUE_INTER_DEV_INVALID_VALUE;
que_mng->valid = QUEUE_CREATED;
que_mng->inter_dev_state = QUEUE_STATE_DISABLED;
return DRV_ERROR_NONE;
}
static drvError_t queue_create_local(unsigned int dev_id, const QueueAttr *que_attr, unsigned int *qid)
{
struct queue_local_info local_info;
drvError_t ret;
ret = queue_create_para_check(dev_id, que_attr, qid);
if (ret != DRV_ERROR_NONE) {
return ret;
}
ret = queue_create(que_attr->depth, qid, &local_info);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("queue_create failed. (device_id=%u; ret=%d)\n", dev_id, ret);
return ret;
}
ret = queue_init_mng_info(local_info.que_mng, que_attr, (*qid), dev_id);
if (ret != DRV_ERROR_NONE) {
queue_delete(*qid, (void *)local_info.entity);
QUEUE_LOG_ERR("init manage info failed. (device_id=%u; queue_id=%u; depth=%u; ret=%d)\n",
dev_id, *qid, que_attr->depth, ret);
return ret;
}
queue_set_local_info(*qid, &local_info);
QUEUE_RUN_LOG_INFO("create queue success. (device_id=%u; qid=%u; depth=%u; name=%s; mode=%u; freq=%llu)\n",
dev_id, *qid, que_attr->depth, que_attr->name, que_attr->workMode, esched_get_sys_freq());
return DRV_ERROR_NONE;
}
static drvError_t queue_grant_para_check(unsigned int dev_id, unsigned int qid, int pid)
{
(void)dev_id;
if (pid <= 0) {
QUEUE_LOG_ERR("pid not exist. (qid=%u, pid=%d)\n", qid, pid);
return DRV_ERROR_INVALID_VALUE;
}
return DRV_ERROR_NONE;
}
STATIC drvError_t queue_grant_local(unsigned int dev_id, unsigned int qid, int pid, QueueShareAttr attr)
{
struct queue_manages *que_manage = NULL;
drvError_t ret;
ret = queue_grant_para_check(dev_id, qid, pid);
if (ret != DRV_ERROR_NONE) {
return ret;
}
que_manage = queue_get_local_mng(qid);
if (que_manage == NULL) {
QUEUE_LOG_ERR("Queue local manage is null. (dev_id=%u, qid=%u)\n", dev_id, qid);
return DRV_ERROR_NOT_EXIST;
}
if (que_manage->valid != QUEUE_CREATED) {
QUEUE_LOG_ERR("queue is not created. (qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
QUEUE_RUN_LOG_INFO("queue grant res. (dev_id=%u; qid=%d; pid=%u; manage=%u; read=%u; write=%u; ret=%d)\n",
dev_id, qid, pid, attr.manage, attr.read, attr.write, ret);
return ret;
}
drvError_t queue_attach_local(unsigned int dev_id, unsigned int qid, int time_out)
{
struct queue_manages *que_manage = NULL;
struct queue_local_info local_info;
drvError_t ret;
que_manage = queue_get_local_mng(qid);
if (que_manage != NULL) {
return DRV_ERROR_NONE;
}
ret = queue_attach(qid, g_cur_pid, time_out, &local_info);
if (ret != 0) {
QUEUE_LOG_ERR("Queue attach failed.(qid=%u; timeout=%dms; ret=%d)\n", qid, time_out, ret);
return ret;
}
queue_set_local_info(qid, &local_info);
QUEUE_RUN_LOG_INFO("queue attach success. (qid=%u, dev=%u, timeout=%dms)\n", qid, dev_id, time_out);
return DRV_ERROR_NONE;
}
Function: When deleting a queue, release the mbuf memory.
************************************************************************************************************/
static drvError_t de_queue_inner(struct queue_manages *manage, unsigned int qid)
{
union atomic_queue_head cur_head, new_head;
unsigned int head, tail, depth;
queue_entity_node *que_entity = NULL;
void *buff = NULL;
drvError_t ret;
uint32_t blk_id;
queue_get_entity_and_depth(qid, &que_entity, &depth);
if (que_entity == NULL) {
QUEUE_LOG_ERR("queue entity is NULL. (qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
cur_head.head_value = queue_get_head_value(manage);
head = ((cur_head.head_info.head) % queue_get_local_depth(qid));
tail = queue_get_tail(manage, qid);
while (head != tail) {
Mbuf *mbuf = NULL;
buff = que_entity[head].node;
blk_id = que_entity[head].node_desp.blk_id;
new_head.head_value = queue_head_inc(cur_head, depth);
if (CAS(&manage->queue_head.head_value, cur_head.head_value, new_head.head_value) == true) {
ret = create_priv_mbuf_for_queue(&mbuf, buff, blk_id);
if (ret == DRV_ERROR_NONE) {
(void)mbuf_free_for_queue(mbuf, MBUF_FREE_BY_QUEUE_DS);
} else {
QUEUE_LOG_WARN("dequeue Mbuf is illegal. (qid=%u; mbuf=%p; head=%u)\n", manage->id, buff, head);
}
}
cur_head.head_value = new_head.head_value;
head = ((cur_head.head_info.head) % queue_get_local_depth(qid));
}
return DRV_ERROR_NONE;
}
STATIC void queue_wake_up_wait_event(struct queue_manages *que_manage, int rsp_ret)
{
#ifndef DRV_HOST
struct event_summary back_event = {0};
struct event_proc_result rsp = {0};
back_event.dst_engine = CCPU_HOST;
rsp.ret = rsp_ret;
back_event.msg_len = (unsigned int)sizeof(struct event_proc_result);
back_event.msg = (char *)&rsp;
back_event.subevent_id = queue_get_virtual_qid(que_manage->id, LOCAL_QUEUE);
back_event.policy = ONLY;
if ((que_manage->consumer.pid != 0) && (que_manage->consumer.dst_engine == CCPU_HOST)
&& (que_manage->consumer.inner_sub_flag == QUEUE_INNER_SUB_FLAG)) {
back_event.pid = que_manage->consumer.pid;
back_event.grp_id = que_manage->consumer.groupid;
back_event.event_id = (EVENT_ID)que_manage->consumer.eventid;
drvError_t ret = halEschedSubmitEvent(que_manage->dev_id, &back_event);
if (ret != 0) {
QUEUE_LOG_ERR("halEschedSubmitEvent. (ret=%d)\n", ret);
}
}
if ((que_manage->producer.pid != 0) && (que_manage->producer.dst_engine == CCPU_HOST)
&& (que_manage->producer.inner_sub_flag == QUEUE_INNER_SUB_FLAG)) {
back_event.pid = que_manage->producer.pid;
back_event.grp_id = que_manage->producer.groupid;
back_event.event_id = (EVENT_ID)que_manage->producer.eventid;
drvError_t ret = halEschedSubmitEvent(que_manage->dev_id, &back_event);
if (ret != 0) {
QUEUE_LOG_ERR("halEschedSubmitEvent. (ret=%d)\n", ret);
}
}
#endif
(void)que_manage;
(void)rsp_ret;
}
void queue_detach_invalid_queue(void)
{
struct queue_manages *que_manage = NULL;
unsigned int i;
for (i = 0; i < MAX_SURPORT_QUEUE_NUM; i++) {
if (queue_is_valid(i) == false) {
continue;
}
que_manage = queue_get_local_mng(i);
if (que_manage == NULL) {
continue;
}
if (que_manage->valid != QUEUE_UNCREATED) {
continue;
}
if (queue_local_is_attached_and_set_detached(i) == false) {
continue;
}
queue_put(i);
}
}
static drvError_t queue_destroy_res(struct queue_manages *que_manage, unsigned int qid)
{
atomic_value_set_invalid(&que_manage->tail_status);
if (is_queue_empty(que_manage) == false) {
drvError_t ret = de_queue_inner(que_manage, qid);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("queue free Mbuf failed. (qid=%u; ret=%d)\n", que_manage->id, (int)ret);
return DRV_ERROR_INNER_ERR;
}
}
que_manage->creator_pid = 0;
if (queue_is_merge_idx_valid(que_manage->merge_idx)) {
(void)queue_merge_uninit(que_manage, qid);
}
return DRV_ERROR_NONE;
}
static drvError_t queue_destroy_local(unsigned int dev_id, unsigned int qid)
{
struct queue_manages *que_manage = NULL;
drvError_t ret;
if (!queue_is_init()) {
QUEUE_LOG_ERR("queue not init. (qid=%u)\n", qid);
return DRV_ERROR_UNINIT;
}
que_manage = (struct queue_manages *)queue_get_que_addr(qid);
if (que_manage->valid != QUEUE_CREATED) {
return DRV_ERROR_NONE;
}
if (que_manage->inter_dev_state == QUEUE_STATE_IMPORTED) {
QUEUE_LOG_ERR("queue inter dev state err. (qid=%u)\n", qid);
return DRV_ERROR_PARA_ERROR;
}
queue_wake_up_wait_event(que_manage, QUEUE_IS_DESTROY_MAGIC);
if (CAS(&que_manage->valid, QUEUE_CREATED, QUEUE_DESTROYING) == false) {
QUEUE_LOG_INFO("repeated destroy. (dev_id=%u; queue_id=%u)\n", dev_id, qid);
return DRV_ERROR_NONE;
}
if (queue_local_is_attached_and_set_detached(qid) == false) {
QUEUE_LOG_ERR("repeated detach. (dev_id=%u, qid=%u)\n", dev_id, qid);
que_manage->valid = QUEUE_CREATED;
return DRV_ERROR_INNER_ERR;
}
ret = queue_destroy_res(que_manage, qid);
if (ret != DRV_ERROR_NONE) {
(void)queue_local_is_detached_and_se_attached(qid);
que_manage->valid = QUEUE_CREATED;
return ret;
}
que_manage->valid = QUEUE_UNCREATED;
queue_put(qid);
QUEUE_RUN_LOG_INFO("destroy success. (dev_id=%u; queue_id=%u)\n", dev_id, qid);
return DRV_ERROR_NONE;
}
STATIC drvError_t queue_reset_res(struct queue_manages *que_manage, unsigned int dev_id, unsigned int qid)
{
drvError_t ret;
if (is_queue_empty(que_manage) == false) {
ret = de_queue_inner(que_manage, qid);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("queue free mbuf failed. (dev_id=%u; qid=%u; ret=%d)\n",
dev_id, que_manage->id, (int)ret);
return DRV_ERROR_INNER_ERR;
}
}
return DRV_ERROR_NONE;
}
drvError_t queue_reset_local(unsigned int dev_id, unsigned int qid)
{
struct queue_manages *que_manage = NULL;
drvError_t ret;
if (queue_device_invalid(dev_id) || (qid >= MAX_SURPORT_QUEUE_NUM)) {
QUEUE_LOG_ERR("para is error. (dev_id=%u; qid=%u)\n", dev_id, qid);
return DRV_ERROR_INVALID_VALUE;
}
if (!queue_is_init()) {
QUEUE_LOG_ERR("queue not init. (dev_id=%u; qid=%u)\n", dev_id, qid);
return DRV_ERROR_UNINIT;
}
if (!queue_get(qid)) {
QUEUE_LOG_ERR("queue get failed. (dev_id=%u; qid=%u)\n", dev_id, qid);
return DRV_ERROR_NOT_EXIST;
}
que_manage = (struct queue_manages *)queue_get_que_addr(qid);
if (que_manage->valid != QUEUE_CREATED) {
queue_put(qid);
QUEUE_LOG_ERR("queue is not created. (dev_id=%u; qid=%u; valid=%d)\n",
dev_id, qid, que_manage->valid);
return DRV_ERROR_NOT_EXIST;
}
#ifndef EMU_ST
if (CAS(&que_manage->enque_cas, 0, 1) == false) {
queue_put(qid);
QUEUE_LOG_WARN("queue is try to enque. (dev_id=%u; qid=%u)\n", dev_id, qid);
return DRV_ERROR_BUSY;
}
if (CAS(&que_manage->deque_cas, 0, 1) == false) {
(void)CAS(&que_manage->enque_cas, 1, 0);
queue_put(qid);
QUEUE_LOG_WARN("queue is try to deque. (dev_id=%u; qid=%u)\n", dev_id, qid);
return DRV_ERROR_BUSY;
}
#endif
queue_wake_up_wait_event(que_manage, QUEUE_IS_CLEAR_MAGIC);
ret = queue_reset_res(que_manage, dev_id, qid);
if (ret == DRV_ERROR_NONE) {
que_manage->empty_flag = 1;
ATOMIC_SET(&que_manage->full_flag, 0);
QUEUE_RUN_LOG_INFO("queue buff reset success. (dev_id=%u; qid=%u)\n", dev_id, qid);
}
if (CAS(&que_manage->enque_cas, 1, 0) == false) {
QUEUE_LOG_ERR("enque write cas failed. (dev_id=%u; qid=%u; enque_cas=%d)\n",
dev_id, qid, que_manage->enque_cas);
}
if (CAS(&que_manage->deque_cas, 1, 0) == false) {
QUEUE_LOG_ERR("enque write cas failed. (dev_id=%u; qid=%u; enque_cas=%d)\n",
dev_id, qid, que_manage->enque_cas);
}
queue_put(qid);
return ret;
}
drvError_t queue_query_alive(unsigned int devid, unsigned int qid)
{
struct queue_manages *que_manage = NULL;
drvError_t ret;
ret = get_queue_manage_by_qid(devid, qid, &que_manage);
if (ret != DRV_ERROR_NONE) {
return ret;
}
if (!queue_get(qid)) {
return DRV_ERROR_NOT_EXIST;
}
if (que_manage->valid != QUEUE_CREATED) {
queue_put(qid);
return DRV_ERROR_NOT_EXIST;
}
queue_put(qid);
return DRV_ERROR_NONE;
}
static drvError_t submit_queue_event(unsigned int devid, unsigned int qid, struct sub_info *sub_event)
{
struct event_summary event = {0};
struct QueueEventMsg msg;
unsigned int submit_devid = devid;
if ((sub_event->pid == 0) || (sub_event->eventid == 0)) {
return DRV_ERROR_NO_PROCESS;
}
msg.src_location = sub_event->src_location;
msg.src_udevid = sub_event->src_udevid;
event.pid = sub_event->pid;
event.dst_engine = sub_event->dst_engine;
event.subevent_id = qid;
event.event_id = (EVENT_ID)sub_event->eventid;
event.grp_id = sub_event->groupid;
event.msg = (char *)&msg;
event.msg_len = sizeof(msg);
#ifndef DRV_HOST
if ((event.dst_engine == CCPU_DEVICE) || (event.dst_engine == ACPU_DEVICE)) {
if (sub_event->dst_devid != QUEUE_INVALID_VALUE) {
submit_devid = sub_event->dst_devid;
}
}
#endif
if (sub_event->spec_thread == true) {
event.tid = sub_event->tid;
} else {
event.tid = QUEUE_INVALID_VALUE;
}
return halEschedSubmitEventEx(submit_devid, sub_event->dst_devid, &event);
}
#define MAX_DEQUEUE_TIME 0xffffffffffffffffULL
STATIC bool is_queue_de_queue_time_out(struct group_merge *merge)
{
unsigned long long last_dequeue_time;
unsigned long long diff_time = 0;
unsigned long long cur_time;
cur_time = buff_get_cur_timestamp();
last_dequeue_time = merge->last_dequeue_time;
if (cur_time > last_dequeue_time) {
diff_time = cur_time - last_dequeue_time;
}
if (diff_time > ENQUE_EVENT_TIMEOUT) {
merge->last_dequeue_time = MAX_DEQUEUE_TIME;
return true;
}
return false;
}
static drvError_t queue_group_event_fail(struct queue_manages *manage, struct group_merge *merge,
drvError_t result)
{
if (result == DRV_ERROR_NO_PROCESS) {
ATOMIC_INC(&manage->stat.enque_none_subscrib);
return DRV_ERROR_NONE;
}
ATOMIC_INC(&merge->event_stat.user_event_fail);
ATOMIC_SET(&merge->enque_event_ret, (unsigned int)result);
ATOMIC_INC(&manage->stat.enque_event_fail);
ATOMIC_SET(&manage->stat.enque_event_ret, (unsigned int)result);
QUEUE_LOG_ERR("send enqueue event failed. (device_id=%u; queue_id=%u; pid=%d; spec_thread=%d; tid=%u; ret=%d)\n",
manage->dev_id, manage->id, manage->consumer.pid, manage->consumer.spec_thread, manage->consumer.tid, result);
return result;
}
static void que_sub_info_order_assign(struct sub_info *sub, struct sub_info *event)
{
event->pid = sub->pid;
* Ensure other subscription information is consistent with the validity of the pid.
*/
wmb();
event->src_location = sub->src_location;
event->src_udevid = sub->src_udevid;
event->dst_engine = sub->dst_engine;
event->groupid = sub->groupid;
event->spec_thread = sub->spec_thread;
event->tid = sub->tid;
event->dst_devid = sub->dst_devid;
event->sub_send = sub->sub_send;
* Ensure other subscription information is consistent with the validity of the eventid.
*/
wmb();
event->eventid = sub->eventid;
}
static drvError_t send_group_queue_event(unsigned int devid, struct queue_manages *manage, int add_size)
{
(void)add_size;
struct group_merge *merge = NULL;
int merge_idx = manage->merge_idx;
unsigned int qid = queue_get_virtual_qid(manage->id, LOCAL_QUEUE);
if (queue_is_merge_idx_valid(merge_idx) == false) {
QUEUE_RUN_LOG_INFO("queue merge_idx is invalid. (queue_id=%u; bind_type=%d; merge_idx=%d)\n", manage->id,
manage->bind_type, merge_idx);
return DRV_ERROR_INNER_ERR;
}
merge = queue_get_merge_addr(merge_idx);
if (merge->last_dequeue_time == 0) {
(void)CAS(&merge->last_dequeue_time, 0, buff_get_cur_timestamp());
}
if (merge->pause_flag == 0 && (CAS(&merge->atomic_flag, 0, 1) || is_queue_de_queue_time_out(merge))) {
struct sub_info sub_event = {0};
drvError_t ret;
que_sub_info_order_assign(&manage->consumer, &sub_event);
ret = submit_queue_event(devid, qid, &sub_event);
if (ret != DRV_ERROR_NONE) {
(void)CAS(&merge->atomic_flag, 1, 0);
return queue_group_event_fail(manage, merge, ret);
}
ATOMIC_INC(&merge->event_stat.user_event_succ);
ATOMIC_SET(&merge->enque_event_ret, DRV_ERROR_NONE);
ATOMIC_INC(&manage->stat.enque_event_ok);
ATOMIC_SET(&manage->stat.enque_event_ret, DRV_ERROR_NONE);
}
return DRV_ERROR_NONE;
}
static drvError_t queue_single_event_fail(struct queue_manages *manage, drvError_t result)
{
if (result == DRV_ERROR_NO_PROCESS) {
ATOMIC_INC(&manage->stat.enque_none_subscrib);
return DRV_ERROR_NONE;
}
ATOMIC_INC(&manage->stat.enque_event_fail);
ATOMIC_SET(&manage->stat.enque_event_ret, (unsigned int)result);
QUEUE_LOG_ERR("send queue event failed. "
"(dev_id=%u; queue_id=%u; mode=%d; pid=%d; spec_thread=%d; tid=%u; ret=%d)\n",
manage->dev_id, manage->id, manage->work_mode, manage->consumer.pid,
manage->consumer.spec_thread, manage->consumer.tid, result);
return result;
}
static drvError_t send_single_queue_event(unsigned int devid, struct queue_manages *manage)
{
int mode = (manage->work_mode == 0) ? QUEUE_MODE_PUSH : manage->work_mode;
struct sub_info sub_event = {0};
unsigned int qid = queue_get_virtual_qid(manage->id, LOCAL_QUEUE);
drvError_t ret;
if ((mode != (int)QUEUE_MODE_PULL) && (CAS(&manage->event_flag, 0, 1))) {
que_sub_info_order_assign(&manage->consumer, &sub_event);
ret = submit_queue_event(devid, qid, &sub_event);
if (ret != DRV_ERROR_NONE) {
CAS(&manage->event_flag, 1, 0);
return queue_single_event_fail(manage, ret);
}
ATOMIC_INC(&manage->stat.enque_event_ok);
ATOMIC_SET(&manage->stat.enque_event_ret, DRV_ERROR_NONE);
} else if ((mode == (int)QUEUE_MODE_PULL) && (CAS(&manage->empty_flag, 1, 0))) {
que_sub_info_order_assign(&manage->consumer, &sub_event);
ret = submit_queue_event(devid, qid, &sub_event);
if (ret != DRV_ERROR_NONE) {
CAS(&manage->empty_flag, 0, 1);
return queue_single_event_fail(manage, ret);
}
ATOMIC_INC(&manage->stat.enque_event_ok);
ATOMIC_SET(&manage->stat.enque_event_ret, DRV_ERROR_NONE);
}
return DRV_ERROR_NONE;
}
void send_queue_event(unsigned int dev_id, struct queue_manages *manage)
{
int type = manage->bind_type;
if (manage->consumer.pid == 0) {
ATOMIC_INC(&manage->stat.enque_none_subscrib);
return;
}
if (type == (int)QUEUE_TYPE_GROUP) {
(void)send_group_queue_event(dev_id, manage, 1);
} else {
(void)send_single_queue_event(dev_id, manage);
}
return;
}
static drvError_t over_write_queue(struct queue_manages *que_manage, unsigned int qid)
{
unsigned int head, depth, size;
union atomic_queue_head cur_head, new_head;
queue_entity_node *que_entity = NULL;
void *buff = NULL;
drvError_t ret;
uint32_t blk_id;
queue_get_entity_and_depth(qid, &que_entity, &depth);
if (que_entity == NULL) {
QUEUE_LOG_ERR("queue entity is NULL.(qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
cur_head.head_value = queue_get_head_value(que_manage);
head = ((cur_head.head_info.head) % queue_get_local_depth(qid));
size = get_queue_size(head, queue_get_orig_tail(que_manage), depth);
if ((size < (depth - QUEUE_DEPT_RES_SIZE)) || (size == 0)) {
return DRV_ERROR_NONE;
}
buff = que_entity[head].node;
blk_id = que_entity[head].node_desp.blk_id;
new_head.head_value = queue_head_inc(cur_head, depth);
if (CAS(&que_manage->queue_head.head_value, cur_head.head_value, new_head.head_value) == true) {
Mbuf *mbuf = NULL;
ATOMIC_INC(&que_manage->stat.enque_drop);
ret = create_priv_mbuf_for_queue(&mbuf, buff, blk_id);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("ow dequeue mbuf is illegal. (qid=%u, mbuf=%p)\n", qid, buff);
return DRV_ERROR_INNER_ERR;
}
ret = mbuf_free_for_queue(mbuf, (int)MBUF_FREE_BY_QUEUE_OW);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("ow free mbuf failed. (qid=%u, mbuf=%p, ret=%d)\n", qid, buff, ret);
return DRV_ERROR_INNER_ERR;
}
}
return DRV_ERROR_NONE;
}
static drvError_t queue_enqueue(struct queue_manages *que_manage, unsigned int qid, Mbuf *mbuf)
{
void *buff = (void *)get_share_mbuf_by_mbuf(mbuf);
unsigned int depth, tail;
queue_entity_node *que_entity = NULL;
queue_get_entity_and_depth(qid, &que_entity, &depth);
if (que_entity == NULL) {
QUEUE_LOG_ERR("queue entity is NULL.(qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
mbuf_owner_update_for_enque(mbuf, g_cur_pid, qid);
tail = queue_get_tail(que_manage, qid);
(void)ATOMIC_SET((&que_entity[tail].node), buff);
(void)ATOMIC_SET((&que_entity[tail].node_desp.blk_id), mbuf->blk_id);
wmb();
if (queue_enque_set_tail(que_manage, depth, qid) == false) {
return DRV_ERROR_QUEUE_NOT_CREATED;
}
destroy_priv_mbuf_for_queue(mbuf);
ATOMIC_INC(&que_manage->stat.enque_ok);
que_get_time(&que_manage->stat.last_enque_time);
return DRV_ERROR_NONE;
}
static inline drvError_t queue_buff_verify(void *buff)
{
if (buff == NULL) {
QUEUE_LOG_ERR("buff is NULL.\n");
return DRV_ERROR_PARA_ERROR;
}
return DRV_ERROR_NONE;
}
static inline drvError_t queue_enqueue_para_check(unsigned int dev_id, unsigned int qid, void *buff)
{
if (queue_device_invalid(dev_id) || (qid >= MAX_SURPORT_QUEUE_NUM)) {
QUEUE_LOG_ERR("para is error. (dev_id=%u, qid=%u)\n", dev_id, qid);
return DRV_ERROR_INVALID_VALUE;
}
if (queue_buff_verify(buff) != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("enqueue muff is illegal. (qid=%u, mbuf=%p)\n", qid, buff);
return DRV_ERROR_INVALID_VALUE;
}
return DRV_ERROR_NONE;
}
drvError_t queue_enqueue_local(unsigned int dev_id, unsigned int qid, void *mbuf)
{
struct queue_manages *que_manage = NULL;
enque_timestamps enque_timestamp = {0};
drvError_t ret;
enque_timestamp.total_start_timestamp = buff_get_cur_timestamp();
ret = queue_enqueue_para_check(dev_id, qid, mbuf);
if (ret != DRV_ERROR_NONE) {
return ret;
}
if (!queue_get(qid)) {
QUEUE_LOG_ERR("queue get failed. (qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
que_manage = queue_get_local_mng(qid);
if (que_manage == NULL) {
queue_put(qid);
QUEUE_LOG_ERR("Queue local manage is null. (dev_id=%u, qid=%u)\n", dev_id, qid);
return DRV_ERROR_NOT_EXIST;
}
if (que_manage->valid != QUEUE_CREATED) {
queue_put(qid);
QUEUE_LOG_ERR(" queue(%u) is not created.\n", qid);
return DRV_ERROR_NOT_EXIST;
}
if ((que_manage->inter_dev_state == QUEUE_STATE_UNEXPORTED) ||
(que_manage->inter_dev_state == QUEUE_STATE_UNIMPORTED)) {
queue_put(qid);
return DRV_ERROR_PARA_ERROR;
}
ATOMIC_INC(&que_manage->stat.enque_fail);
if (CAS(&que_manage->enque_cas, 0, 1) == false) {
queue_put(qid);
QUEUE_LOG_WARN("queue is try to enque multiple times. (qid=%u)\n", qid);
return DRV_ERROR_BUSY;
}
if ((que_manage->over_write != 0) &&
(get_que_size_by_mng(que_manage, qid) >= (queue_get_local_depth(qid) - QUEUE_DEPT_RES_SIZE))) {
enque_timestamp.overwrite_start_timstamp = buff_get_cur_timestamp();
ret = over_write_queue(que_manage, qid);
if (ret != DRV_ERROR_NONE) {
goto ERR;
}
enque_timestamp.overwrite_end_timestamp = buff_get_cur_timestamp();
} else if (is_queue_full(que_manage, qid)) {
send_queue_event(dev_id, que_manage);
ATOMIC_INC(&que_manage->stat.enque_full);
ATOMIC_SET(&que_manage->full_flag, 1);
ret = DRV_ERROR_QUEUE_FULL;
goto ERR;
}
ret = queue_enqueue(que_manage, qid, (Mbuf *)mbuf);
if (ret != DRV_ERROR_NONE) {
goto ERR;
}
* otherwise concurrent enqueue and dequeue will cause E2NE event to be lost.
*/
mb();
ATOMIC_DEC(&que_manage->stat.enque_fail);
send_queue_event(dev_id, que_manage);
if (is_queue_full(que_manage, qid)) {
ATOMIC_SET(&que_manage->full_flag, 1);
}
ERR:
enque_timestamp.total_end_timestamp = buff_get_cur_timestamp();
if (CAS(&que_manage->enque_cas, 1, 0) == false) {
QUEUE_LOG_ERR("enque write cas failed. (qid=%u; enque_cas=%d)\n", qid, que_manage->enque_cas);
}
queue_put(qid);
if (enque_timestamp.total_end_timestamp - enque_timestamp.total_start_timestamp > MAX_ENQUE_TIME) {
QUEUE_LOG_INFO("enqueue start:(%llu), enque end:(%llu), over_write start:(%llu), over_write end:(%llu), "
"event start:(%llu), event end:(%llu), enter kernel:(%llu), kernel handle start:(%llu), "
"kernel handle end:(%llu),enqueue total:(%llu), over_write total:(%llu), "
"event total:(%llu),kernel total:(%llu).\n",
enque_timestamp.total_start_timestamp, enque_timestamp.total_end_timestamp,
enque_timestamp.overwrite_start_timstamp, enque_timestamp.overwrite_end_timestamp,
enque_timestamp.submit_start_timestamp, enque_timestamp.submit_end_timestamp,
enque_timestamp.kern_start_timestamp, enque_timestamp.kern_submit_timstamp,
enque_timestamp.kern_end_timestamp,
enque_timestamp.total_end_timestamp - enque_timestamp.total_start_timestamp,
enque_timestamp.overwrite_end_timestamp - enque_timestamp.overwrite_start_timstamp,
enque_timestamp.submit_end_timestamp - enque_timestamp.submit_start_timestamp,
enque_timestamp.kern_end_timestamp - enque_timestamp.kern_start_timestamp);
}
return ret;
}
void send_f_to_nf_event(unsigned int dev_id, struct queue_manages *que_manage)
{
struct sub_info sub_event = {0};
drvError_t ret;
unsigned int qid = queue_get_virtual_qid(que_manage->id, LOCAL_QUEUE);
if (que_manage->producer.inner_sub_flag == QUEUE_INTER_SUB_FLAG) {
qid = que_manage->remote_qid;
}
ATOMIC_SET(&que_manage->full_flag, 0);
que_sub_info_order_assign(&que_manage->producer, &sub_event);
ret = submit_queue_event(dev_id, qid, &sub_event);
if (ret != DRV_ERROR_NONE) {
if (ret == DRV_ERROR_NO_PROCESS) {
return;
}
ATOMIC_SET(&que_manage->full_flag, 1);
ATOMIC_INC(&que_manage->stat.f2nf_event_fail);
if (ret == DRV_ERROR_NO_SUBSCRIBE_THREAD) {
return;
}
QUEUE_LOG_ERR("send device queue f2nf event failed. (dev_id=%u; qid=%u; ret=%d)\n", dev_id, qid, ret);
} else {
ATOMIC_INC(&que_manage->stat.f2nf_event_ok);
}
return;
}
static void sub_queue_send_event(unsigned int dev_id, struct queue_manages *manage)
{
if (manage->queue_head.head_info.head != queue_get_orig_tail(manage)) {
send_queue_event(dev_id, manage);
}
}
static drvError_t dequeue_one_mbuf(struct queue_manages *que_manage, unsigned int qid,
void **buff_inner, uint32_t *blk_id)
{
union atomic_queue_head cur_head, new_head;
unsigned int depth, head, cas_ret;
queue_entity_node *que_entity = NULL;
uint32_t blk_id_;
void *buff = NULL;
unsigned int cnt;
queue_get_entity_and_depth(qid, &que_entity, &depth);
if (que_entity == NULL) {
QUEUE_LOG_ERR("queue entity is NULL.(qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
for (cnt = 0; cnt < MAX_DEQUEUE_TRY_TIMES; cnt++) {
cur_head.head_value = queue_get_head_value(que_manage);
head = ((cur_head.head_info.head) % queue_get_local_depth(qid));
if (head == queue_get_orig_tail(que_manage)) {
return DRV_ERROR_QUEUE_EMPTY;
}
* read the corresponding content of head first and then determine whether the queue is empty.
* Therefore, rmb should be added to preserve order.
*/
rmb();
buff = que_entity[head].node;
blk_id_ = que_entity[head].node_desp.blk_id;
new_head.head_value = queue_head_inc(cur_head, depth);
cas_ret = CAS(&que_manage->queue_head.head_value, cur_head.head_value, new_head.head_value);
if (cas_ret == true) {
break;
}
}
if (cas_ret == false) {
QUEUE_LOG_WARN("try dequeue again. (qid=%u; cnt=%u)\n", qid, cnt);
return DRV_ERROR_BUSY;
}
ATOMIC_INC(&que_manage->stat.deque_num);
*buff_inner = buff;
*blk_id = blk_id_;
return DRV_ERROR_NONE;
}
static drvError_t dequeue_by_flow_ctrl(struct queue_manages *que_manage, unsigned int qid, Mbuf **mbuf)
{
unsigned long timestamp;
Mbuf *mbuf_inner = NULL;
void *buff_inner = NULL;
unsigned int size;
drvError_t ret;
uint32_t blk_id;
ret = dequeue_one_mbuf(que_manage, qid, &buff_inner, &blk_id);
if (ret != DRV_ERROR_NONE) {
return ret;
}
* If the mbuf is illegal, the error code DRV_ERROR_NO_RESOURCES needs to be returned,
* and the upper layer will perform selective processing based on the error code.
*/
ret = create_priv_mbuf_for_queue(&mbuf_inner, buff_inner, blk_id);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("dequeue mbuf is illegal. (qid=%u; mbuf=0x%llx)\n",
qid, (unsigned long long)(uintptr_t)buff_inner);
return ret;
}
size = get_que_size_by_mng(que_manage, qid);
timestamp = buff_get_cur_timestamp();
if ((que_manage->fctl_flag != 0)) {
unsigned int cnt = 0;
while ((!is_queue_empty(que_manage)) && (cnt < size) &&
(tick_to_millisec(timestamp - (unsigned long)mbuf_get_timestamp(mbuf_inner)) >= que_manage->drop_time)) {
ret = mbuf_free_for_queue(mbuf_inner, MBUF_FREE_BY_QUEUE_DQ);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("free mbuff failed. (qid=%u; mbuf=%p; ret=%d)\n", que_manage->id, buff_inner, ret);
return DRV_ERROR_INNER_ERR;
}
ATOMIC_INC(&que_manage->stat.deque_drop);
ret = dequeue_one_mbuf(que_manage, qid, &buff_inner, &blk_id);
if (ret != DRV_ERROR_NONE) {
return ret;
}
* If the mbuf is illegal, the error code DRV_ERROR_NO_RESOURCES needs to be returned,
* and the upper layer will perform selective processing based on the error code.
*/
ret = create_priv_mbuf_for_queue(&mbuf_inner, buff_inner, blk_id);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("dequeue mbuf is illegal. (qid=%u; mbuf=%p)\n", que_manage->id, buff_inner);
#ifndef EMU_ST
return ret;
#endif
}
cnt++;
}
}
*mbuf = mbuf_inner;
ATOMIC_INC(&que_manage->stat.deque_ok);
que_get_time(&que_manage->stat.last_deque_time);
return DRV_ERROR_NONE;
}
static inline drvError_t queue_dequeue_para_check(unsigned int dev_id, unsigned int qid, void **mbuf)
{
if (queue_device_invalid(dev_id) || (qid >= MAX_SURPORT_QUEUE_NUM)) {
QUEUE_LOG_ERR("para is error. (dev_id=%u, qid=%u)\n", dev_id, qid);
return DRV_ERROR_INVALID_VALUE;
}
if (mbuf == NULL) {
QUEUE_LOG_ERR("mbuf is NULL. (dev_id=%u, qid=%u)\n", dev_id, qid);
return DRV_ERROR_INVALID_VALUE;
}
return DRV_ERROR_NONE;
}
drvError_t queue_dequeue_local(unsigned int dev_id, unsigned int qid, void **mbuf)
{
struct queue_manages *que_manage = NULL;
static THREAD pid_t g_deque_pid = 0;
struct group_merge *merge = NULL;
Mbuf *p_mbuf = NULL;
drvError_t ret;
int merge_idx;
ret = queue_dequeue_para_check(dev_id, qid, mbuf);
if (ret != DRV_ERROR_NONE) {
return ret;
}
if (!queue_get(qid)) {
QUEUE_LOG_ERR("queue get failed. (qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
que_manage = queue_get_local_mng(qid);
if (que_manage == NULL) {
queue_put(qid);
QUEUE_LOG_ERR("Queue local manage is null. (dev_id=%u, qid=%u)\n", dev_id, qid);
return DRV_ERROR_NOT_EXIST;
}
if (que_manage->valid != QUEUE_CREATED) {
queue_put(qid);
QUEUE_LOG_ERR("queue is not created. (qid=%u; valid=%d; QUEUE_CREATED=%d)\n",
qid, que_manage->valid, QUEUE_CREATED);
return DRV_ERROR_NOT_EXIST;
}
ATOMIC_INC(&que_manage->stat.deque_fail);
if (is_queue_empty(que_manage)) {
* Enqueuing may occur before the empty_flag is set to 1.
* Here, add an attempt to send an E2NE event.
*/
que_manage->empty_flag = 1;
* Make sure to set the empty_flag first and then read tail,
* otherwise concurrent enqueue and dequeue will cause E2NE event to be lost.
*/
mb();
sub_queue_send_event(dev_id, que_manage);
ATOMIC_INC(&que_manage->stat.deque_empty);
queue_put(qid);
return DRV_ERROR_QUEUE_EMPTY;
}
if (CAS(&que_manage->deque_cas, 0, 1) == false) {
queue_put(qid);
QUEUE_LOG_WARN("queue is try to deque multiple times. (qid=%u)\n", qid);
return DRV_ERROR_BUSY;
}
ret = dequeue_by_flow_ctrl(que_manage, qid, &p_mbuf);
(void)CAS(&que_manage->deque_cas, 1, 0);
if (ret == DRV_ERROR_QUEUE_EMPTY) {
* Here, add an attempt to send an E2NE event.
*/
que_manage->empty_flag = 1;
* otherwise concurrent enqueue and dequeue will cause E2NE event to be lost.
*/
mb();
sub_queue_send_event(dev_id, que_manage);
ATOMIC_INC(&que_manage->stat.deque_empty);
queue_put(qid);
return ret;
} else if (ret != DRV_ERROR_NONE) {
queue_put(qid);
return ret;
}
ATOMIC_DEC(&que_manage->stat.deque_fail);
if (que_manage->bind_type == QUEUE_TYPE_GROUP) {
merge_idx = que_manage->merge_idx;
if (queue_is_merge_idx_valid(merge_idx)) {
merge = queue_get_merge_addr(merge_idx);
merge->last_dequeue_time = buff_get_cur_timestamp();
}
}
if (g_deque_pid == 0) {
g_deque_pid = GETPID();
}
mbuf_owner_update_for_deque(p_mbuf, g_deque_pid, qid);
*mbuf = p_mbuf;
if (que_manage->full_flag != 0) {
send_f_to_nf_event(dev_id, que_manage);
}
queue_put(qid);
return DRV_ERROR_NONE;
}
drvError_t check_subscribe_para(unsigned int dev_id, unsigned int qid, int type)
{
struct queue_manages *manage = NULL;
drvError_t ret;
ret = get_queue_manage_by_qid(dev_id, qid, &manage);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("para invalid. (qid=%u; type=%d)\n", qid, type);
return ret;
}
if (manage->valid != QUEUE_CREATED) {
QUEUE_LOG_ERR("queue is not created. (qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
if (manage->consumer.pid != 0) {
QUEUE_LOG_ERR("Queue has been subscribed. (qid=%u; pid=%d; spec_thread=%d; tid=%u)\n",
manage->id, manage->consumer.pid, manage->consumer.spec_thread, manage->consumer.tid);
return DRV_ERROR_REPEATED_SUBSCRIBED;
}
if ((type > QUEUE_TYPE_SINGLE) || ((manage->work_mode == QUEUE_MODE_PULL) && (type == QUEUE_TYPE_GROUP))) {
QUEUE_LOG_ERR("para invalid. (qid=%u; work_mode=%d; type=%d)\n", qid, manage->work_mode, type);
return DRV_ERROR_PARA_ERROR;
}
return DRV_ERROR_NONE;
}
static drvError_t queue_get_event_src(unsigned int dev_id, unsigned int dst_engine,
unsigned int *src_location, unsigned int *src_udevid)
{
drvError_t ret;
#ifdef DRV_HOST
(void)dst_engine;
*src_location = EVENT_SRC_LOCATION_HOST;
ret = uda_get_udevid_by_devid_ex(dev_id, src_udevid);
#else
*src_location = EVENT_SRC_LOCATION_DEVICE;
if (dst_engine == CCPU_HOST) {
ret = drvGetDevIDByLocalDevID(dev_id, src_udevid);
} else {
ret = uda_get_udevid_by_devid(dev_id, src_udevid);
}
#endif
return ret;
}
drvError_t subscribe_queue(unsigned int dev_id, unsigned int qid, struct sub_info sub_info, int type)
{
struct queue_manages *manage = NULL;
drvError_t ret;
manage = queue_get_local_mng(qid);
if (manage == NULL) {
QUEUE_LOG_ERR("Queue local manage is null. (dev_id=%u, qid=%u)\n", dev_id, qid);
return DRV_ERROR_NOT_EXIST;
}
switch (type) {
case QUEUE_TYPE_GROUP:
ret = queue_merge_init(manage, qid, sub_info.pid, sub_info.groupid);
if (ret != DRV_ERROR_NONE) {
return ret;
}
break;
case QUEUE_TYPE_SINGLE:
break;
default:
QUEUE_LOG_ERR("subscribe queue type is not valid. (qid=%u, type=%d)\n", manage->id, type);
return DRV_ERROR_PARA_ERROR;
}
ret = queue_register_callback(sub_info.groupid);
if (ret != DRV_ERROR_NONE) {
if (type == QUEUE_TYPE_GROUP) {
(void)queue_merge_uninit(manage, qid);
}
QUEUE_LOG_ERR("register callback failed. (dev_id=%u; queue_id=%u; type=%d)\n", dev_id, manage->id, type);
return ret;
}
ret = queue_get_event_src(dev_id, sub_info.dst_engine, &manage->consumer.src_location, &manage->consumer.src_udevid);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("get event src failed. (devid=%u, qid=%u)\n", dev_id, qid);
return ret;
}
manage->bind_type = type;
manage->consumer.dst_engine = sub_info.dst_engine;
manage->consumer.eventid = sub_info.eventid;
manage->consumer.groupid = sub_info.groupid;
manage->consumer.sub_send = sub_info.sub_send;
manage->consumer.spec_thread = sub_info.spec_thread;
manage->consumer.tid = sub_info.tid;
manage->consumer.dst_devid = sub_info.dst_devid;
manage->consumer.inner_sub_flag = sub_info.inner_sub_flag;
* The enqueue event can be sent only after all structure variables are assigned values.
*/
wmb();
manage->consumer.pid = sub_info.pid;
if (manage->consumer.sub_send == 1) {
sub_queue_send_event(dev_id, manage);
}
QUEUE_LOG_INFO("subscribe queue success. (queue=%u; pid=%d; spec_thread=%d; tid=%u;"
" group=%u; event_id=%u; type=%d; event_flag=%d;"
" enque_ok=0x%llx; deque_ok=0x%llx;"
" enque_event_ok=0x%llx; enque_event_fail=0x%llx; enque_none_subscrib=0x%lx;"
" head=%u; tail=%u)\n",
manage->id, manage->consumer.pid, manage->consumer.spec_thread, manage->consumer.tid,
manage->consumer.groupid, manage->consumer.eventid, type, manage->event_flag,
manage->stat.enque_ok, manage->stat.deque_ok,
manage->stat.enque_event_ok, manage->stat.enque_event_fail, manage->stat.enque_none_subscrib,
manage->queue_head.head_info.head, queue_get_orig_tail(manage));
return DRV_ERROR_NONE;
}
STATIC drvError_t queue_subscribe_local(unsigned int dev_id, unsigned int qid, unsigned int group_id, int type)
{
struct QueueSubPara sub_para = {0};
sub_para.devId = dev_id;
sub_para.qid = qid;
sub_para.groupId = group_id;
sub_para.eventType = QUEUE_ENQUE_EVENT;
sub_para.queType = type;
sub_para.flag = 0;
return queue_sub_event_local(&sub_para);
}
STATIC drvError_t queue_unsubscribe_local(unsigned int dev_id, unsigned int qid)
{
struct QueueUnsubPara unsub_para = {0};
unsub_para.devId = dev_id;
unsub_para.qid = qid;
unsub_para.eventType = QUEUE_ENQUE_EVENT;
return queue_unsub_event_local(&unsub_para);
}
drvError_t sub_f_to_nf_event(unsigned int dev_id, unsigned int qid, struct sub_info sub_event)
{
static struct timeval last_log_time = {0};
struct timeval current_time;
struct queue_manages *manage = NULL;
drvError_t ret;
ret = get_queue_manage_by_qid(dev_id, qid, &manage);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("para qid(%u) invalid.\n", qid);
return ret;
}
if (manage->valid != QUEUE_CREATED) {
QUEUE_LOG_ERR("queue(%u) is not created.\n", qid);
return DRV_ERROR_NOT_EXIST;
}
if ((manage->inter_dev_state == QUEUE_STATE_UNEXPORTED) || (manage->inter_dev_state == QUEUE_STATE_UNIMPORTED)) {
QUEUE_LOG_INFO("queue(%u) has been unexported or unimported. (state=%d)\n", qid, manage->inter_dev_state);
return DRV_ERROR_NOT_SUPPORT;
}
if (manage->producer.pid != 0) {
QUEUE_LOG_ERR("queue(%u) has been subscribed.\n", qid);
return DRV_ERROR_REPEATED_SUBSCRIBED;
}
ret = queue_get_event_src(dev_id, sub_event.dst_engine, &manage->producer.src_location, &manage->producer.src_udevid);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("get event src failed. (devid=%u, qid=%u)\n", dev_id, qid);
return ret;
}
manage->producer.dst_engine = sub_event.dst_engine;
manage->producer.eventid = sub_event.eventid;
manage->producer.groupid = sub_event.groupid;
manage->producer.sub_send = sub_event.sub_send;
manage->producer.spec_thread = sub_event.spec_thread;
manage->producer.tid = sub_event.tid;
manage->producer.dst_devid = sub_event.dst_devid;
manage->producer.inner_sub_flag = sub_event.inner_sub_flag;
* The f2nf event can be sent only after all structure variables are assigned values.
*/
wmb();
manage->producer.pid = sub_event.pid;
if (manage->producer.sub_send == 1 && manage->full_flag == 0) {
send_f_to_nf_event(dev_id, manage);
}
que_get_time(¤t_time);
if ((current_time.tv_sec - last_log_time.tv_sec) > LOG_TIME_INTERVAL) {
last_log_time = current_time;
QUEUE_RUN_LOG_INFO("sub_f_to_nf_event success. (dev_id=%d; queue=%u; pid=%d; spec_thread=%d; tid=%u; group=%u;"
"enqueue_full=%llu)\n", dev_id, qid, manage->producer.pid, manage->consumer.spec_thread,
manage->consumer.tid, manage->producer.groupid, manage->stat.enque_full);
}
return DRV_ERROR_NONE;
}
STATIC drvError_t queue_sub_f_to_nf_event_local(unsigned int dev_id, unsigned int qid, unsigned int group_id)
{
struct QueueSubPara sub_para = {0};
sub_para.devId = dev_id;
sub_para.qid = qid;
sub_para.groupId = group_id;
sub_para.eventType = QUEUE_F2NF_EVENT;
sub_para.flag = 0;
return queue_sub_event_local(&sub_para);
}
STATIC drvError_t queue_unsub_f_to_nf_event_local(unsigned int dev_id, unsigned int qid)
{
struct QueueUnsubPara unsub_para = {0};
unsub_para.devId = dev_id;
unsub_para.qid = qid;
unsub_para.eventType = QUEUE_F2NF_EVENT;
return queue_unsub_event_local(&unsub_para);
}
static drvError_t queue_sub_para_check(struct QueueSubPara *sub_para)
{
struct queue_manages *manage = NULL;
drvError_t ret;
if (sub_para->eventType >= QUEUE_EVENT_TYPE_MAX) {
QUEUE_LOG_ERR("event_type is invalid. (event_type=%d; event_typeMax=%d)\n",
sub_para->eventType, QUEUE_EVENT_TYPE_MAX);
return DRV_ERROR_PARA_ERROR;
}
ret = get_queue_manage_by_qid(sub_para->devId, sub_para->qid, &manage);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("para is invalid. (dev_id=%u; qid=%u)\n", sub_para->devId, sub_para->qid);
return ret;
}
if (manage->valid != QUEUE_CREATED) {
QUEUE_LOG_ERR("queue is not created. (devid=%u; qid=%u)\n", sub_para->devId, sub_para->qid);
return DRV_ERROR_NOT_EXIST;
}
return DRV_ERROR_NONE;
}
unsigned int queue_get_dst_engine(unsigned int dev_id, unsigned int group_id)
{
unsigned int dst_engine;
#ifdef DRV_HOST
(void)dev_id;
(void)group_id;
dst_engine = CCPU_HOST;
#else
GROUP_TYPE type = GRP_TYPE_BIND_DP_CPU;
drvError_t ret;
ret = esched_query_grp_type(dev_id, group_id, &type);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_WARN("get group type failed. (ret=%d; dev_id=%u; group_id=%u)\n",
ret, dev_id, group_id);
}
dst_engine = (type == GRP_TYPE_BIND_CP_CPU) ? CCPU_DEVICE : ACPU_DEVICE;
#endif
return dst_engine;
}
static void queue_set_sub_info(struct QueueSubPara *sub_para, unsigned int event_id, struct sub_info *sub_info)
{
unsigned int dst_dev = ((sub_para->flag & QUEUE_SUB_FLAG_SPEC_DST_DEVID) != 0) ? sub_para->dstDevId : sub_para->devId;
sub_info->dst_engine = queue_get_dst_engine(dst_dev, sub_para->groupId);
sub_info->eventid = event_id;
sub_info->groupid = sub_para->groupId;
sub_info->pid = GETPID();
sub_info->spec_thread = ((sub_para->flag & QUEUE_SUB_FLAG_SPEC_THREAD) != 0) ? true : false;
sub_info->tid = sub_para->threadId;
sub_info->dst_devid = ((sub_para->flag & QUEUE_SUB_FLAG_SPEC_DST_DEVID) != 0) ?
sub_para->dstDevId : QUEUE_INVALID_VALUE;
sub_info->sub_send = 1;
}
static drvError_t queue_sub_enque_event(struct QueueSubPara *sub_para)
{
struct queue_manages *manage = queue_get_local_mng(sub_para->qid);
struct sub_info sub_info = {0};
int event_id;
if (manage->consumer.pid != 0) {
QUEUE_LOG_ERR("Queue has been subscribed. (queue_id=%u; pid=%d; spec_thread=%d; tid=%u)\n",
sub_para->qid, manage->consumer.pid, manage->consumer.spec_thread, manage->consumer.tid);
return DRV_ERROR_REPEATED_SUBSCRIBED;
}
if ((sub_para->queType > QUEUE_TYPE_SINGLE) ||
((manage->work_mode == QUEUE_MODE_PULL) && (sub_para->queType == QUEUE_TYPE_GROUP))) {
QUEUE_LOG_ERR("para invalid. (qid=%u; work_mode=%d; type=%d)\n",
sub_para->qid, manage->work_mode, sub_para->queType);
return DRV_ERROR_PARA_ERROR;
}
event_id = (manage->work_mode == QUEUE_MODE_PULL) ? EVENT_QUEUE_EMPTY_TO_NOT_EMPTY : EVENT_QUEUE_ENQUEUE;
queue_set_sub_info(sub_para, (unsigned int)event_id, &sub_info);
return subscribe_queue(sub_para->devId, sub_para->qid, sub_info, sub_para->queType);
}
static drvError_t queue_sub_f2nf_event(struct QueueSubPara *sub_para)
{
unsigned int event_id = (unsigned int)EVENT_QUEUE_FULL_TO_NOT_FULL;
struct sub_info sub_info = {0};
queue_set_sub_info(sub_para, event_id, &sub_info);
return sub_f_to_nf_event(sub_para->devId, sub_para->qid, sub_info);
}
drvError_t queue_sub_event_local(struct QueueSubPara *sub_para)
{
drvError_t ret;
ret = queue_sub_para_check(sub_para);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("sub_para check failed.\n");
return ret;
}
switch (sub_para->eventType) {
case QUEUE_ENQUE_EVENT:
return queue_sub_enque_event(sub_para);
case QUEUE_F2NF_EVENT:
return queue_sub_f2nf_event(sub_para);
default:
QUEUE_LOG_ERR("not support event type. (event_type=%u)\n", sub_para->eventType);
return DRV_ERROR_NOT_SUPPORT;
}
}
static drvError_t queue_unsub_para_check(struct QueueUnsubPara *unsub_para)
{
struct queue_manages *manage = NULL;
drvError_t ret;
if ((unsub_para == NULL) || (unsub_para->eventType >= QUEUE_EVENT_TYPE_MAX)) {
QUEUE_LOG_ERR("sub_para is NULL. (sub_para_is_null=%d)\n", unsub_para == NULL);
return DRV_ERROR_PARA_ERROR;
}
ret = get_queue_manage_by_qid(unsub_para->devId, unsub_para->qid, &manage);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_WARN("para is invalid. (dev_id=%u; qid=%u)\n", unsub_para->devId, unsub_para->qid);
return ret;
}
if (manage->valid != QUEUE_CREATED) {
QUEUE_LOG_WARN("queue is not created. (dev_id=%u; qid=%u)\n", unsub_para->devId, unsub_para->qid);
return DRV_ERROR_NOT_EXIST;
}
return DRV_ERROR_NONE;
}
static drvError_t queue_unsub_enque_event(struct QueueUnsubPara *unsub_para)
{
struct queue_manages *manage = NULL;
drvError_t ret;
manage = queue_get_local_mng(unsub_para->qid);
if (manage->bind_type == QUEUE_TYPE_GROUP) {
ret = queue_merge_uninit(manage, unsub_para->qid);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("delete queue merge flag failed. (qid=%u)\n", unsub_para->qid);
return ret;
}
}
manage->consumer.eventid = 0;
* As a result, the event_flag may be fixed to 1.
*/
wmb();
manage->consumer.pid = 0;
manage->consumer.groupid = 0;
manage->consumer.dst_engine = 0;
manage->consumer.sub_send = 0;
manage->consumer.tid = 0;
manage->consumer.spec_thread = false;
manage->consumer.inner_sub_flag = 0;
manage->bind_type = 0;
manage->event_flag = 0;
manage->empty_flag = 0;
QUEUE_LOG_INFO("unsub success. (dev_id=%u; queue_id=%u)\n", unsub_para->devId, unsub_para->qid);
return DRV_ERROR_NONE;
}
static drvError_t queue_unsub_f2nf_event(struct QueueUnsubPara *unsub_para)
{
static struct timeval last_log_time = {0};
struct timeval current_time;
struct queue_manages *manage = NULL;
manage = queue_get_local_mng(unsub_para->qid);
manage->producer.eventid = 0;
wmb();
manage->producer.groupid = 0;
manage->producer.pid = 0;
manage->producer.dst_engine = 0;
manage->producer.sub_send = 0;
manage->producer.inner_sub_flag = 0;
que_get_time(¤t_time);
if ((current_time.tv_sec - last_log_time.tv_sec) > LOG_TIME_INTERVAL) {
last_log_time = current_time;
QUEUE_RUN_LOG_INFO("unsub F2NF event success. (dev_id=%u; queue_id=%u; enque_full=%llu)\n",
unsub_para->devId, unsub_para->qid, manage->stat.enque_full);
}
return DRV_ERROR_NONE;
}
drvError_t queue_unsub_event_local(struct QueueUnsubPara *unsub_para)
{
drvError_t ret;
ret = queue_unsub_para_check(unsub_para);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_WARN("sub_para check invalid.\n");
return ret;
}
switch (unsub_para->eventType) {
case QUEUE_ENQUE_EVENT:
return queue_unsub_enque_event(unsub_para);
case QUEUE_F2NF_EVENT:
return queue_unsub_f2nf_event(unsub_para);
default:
QUEUE_LOG_ERR("not support event type. (event_type=%u)\n", unsub_para->eventType);
return DRV_ERROR_NOT_SUPPORT;
}
}
STATIC drvError_t queue_ctrl_event_local(struct QueueSubscriber *subscriber, QUE_EVENT_CMD cmd_type)
{
if (subscriber == NULL) {
QUEUE_LOG_ERR("subscriber para is null.\n");
return DRV_ERROR_INVALID_VALUE;
}
if (subscriber->devId >= MAX_DEVICE || (subscriber->spGrpId < 0 || subscriber->spGrpId >= MAX_SHARE_GRP) ||
subscriber->pid <= 0) {
QUEUE_LOG_ERR("devid sp_grp_id invalid. (dev_id=%u, sp_grp_id=%d)\n", subscriber->devId, subscriber->spGrpId);
return DRV_ERROR_INVALID_VALUE;
}
return queue_merge_ctrl(subscriber->devId, subscriber->pid, (unsigned int)subscriber->groupId, cmd_type);
}
drvError_t queue_query_info_local(unsigned int dev_id, unsigned int qid, QueueInfo *que_info)
{
struct queue_manages *que_manage = NULL;
queue_entity_node *que_entity = NULL;
unsigned int depth;
drvError_t ret;
ret = get_queue_manage_by_qid(dev_id, qid, &que_manage);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("get_queue_manage_by_qid failed. (qid=%u; ret=%d)\n", qid, ret);
return ret;
}
if (!queue_get(qid)) {
QUEUE_LOG_ERR("queue get failed. (qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
if (que_manage->valid != QUEUE_CREATED) {
queue_put(qid);
QUEUE_LOG_ERR("queue is not created. (qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
queue_get_entity_and_depth(qid, &que_entity, &depth);
if (que_entity == NULL) {
queue_put(qid);
QUEUE_LOG_ERR("queue entity is NULL.(qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
que_info->depth = (int)depth;
que_info->headDataPtr = (void *)(is_queue_empty(que_manage) ? NULL : que_entity[queue_get_head(que_manage, qid)].node);
que_info->id = (int)qid;
que_info->size = (int)get_que_size_by_mng(que_manage, qid);
que_info->type = que_manage->bind_type;
que_info->workMode = (que_manage->work_mode == 0) ? QUEUE_MODE_PUSH : que_manage->work_mode;
que_info->subGroupId = (int)que_manage->consumer.groupid;
que_info->subPid = que_manage->consumer.pid;
que_info->subF2NFPid = que_manage->producer.pid;
que_info->subF2NFGroupId = (int)que_manage->producer.groupid;
que_info->stat.dequeCnt = que_manage->stat.deque_ok;
que_info->stat.enqueCnt = que_manage->stat.enque_ok;
que_info->stat.enqueFailCnt = que_manage->stat.enque_fail;
que_info->stat.dequeFailCnt = que_manage->stat.deque_fail;
que_info->stat.enqueEventOk = que_manage->stat.enque_event_ok;
que_info->stat.enqueEventFail = que_manage->stat.enque_event_fail;
que_info->stat.f2nfEventOk = que_manage->stat.f2nf_event_ok;
que_info->stat.f2nfEventFail = que_manage->stat.f2nf_event_fail;
que_info->stat.lastEnqueTime = que_manage->stat.last_enque_time;
que_info->stat.lastDequeTime = que_manage->stat.last_deque_time;
que_info->entity_type = 0;
if (is_queue_full(que_manage, qid)) {
que_info->status = QUEUE_FULL;
} else {
que_info->status = (int)(is_queue_empty(que_manage) ? QUEUE_EMPTY : QUEUE_NORMAL);
}
ret = strncpy_s(que_info->name, MAX_STR_LEN, que_manage->name, strnlen(que_manage->name, MAX_STR_LEN));
if (ret != DRV_ERROR_NONE) {
queue_put(qid);
QUEUE_LOG_ERR("strncpy queue name failed. (ret=%d)\n", ret);
return DRV_ERROR_PARA_ERROR;
}
queue_put(qid);
return DRV_ERROR_NONE;
}
static drvError_t queue_get_status_para_check(QUEUE_QUERY_ITEM query_item, unsigned int len, void *data)
{
(void)data;
switch (query_item) {
case QUERY_QUEUE_DROP_STAT:
if (len != sizeof(QUEUE_DROP_PKT_STAT)) {
QUEUE_LOG_ERR("len invalid. (len=%u; expect=%zu)\n", len, sizeof(QUEUE_DROP_PKT_STAT));
return DRV_ERROR_INVALID_VALUE;
}
break;
case QUERY_QUEUE_STATUS:
if (len != sizeof(QUEUE_STATUS)) {
QUEUE_LOG_ERR("len invalid. (len=%u; expect=%zu)\n", len, sizeof(QUEUE_STATUS));
return DRV_ERROR_INVALID_VALUE;
}
break;
case QUERY_QUEUE_DEPTH:
if (len != sizeof(int)) {
QUEUE_LOG_ERR("len invalid. (len=%u; expect=%zu)\n", len, sizeof(int));
return DRV_ERROR_INVALID_VALUE;
}
break;
default:
QUEUE_LOG_WARN("query_item not support. (query_item=%u)\n", (unsigned int)query_item);
return DRV_ERROR_INVALID_VALUE;
}
return DRV_ERROR_NONE;
}
drvError_t queue_get_status_local(unsigned int dev_id, unsigned int qid, QUEUE_QUERY_ITEM query_item,
unsigned int len, void *data)
{
struct queue_manages *que_manage = NULL;
drvError_t ret;
ret = queue_get_status_para_check(query_item, len, data);
if (ret != DRV_ERROR_NONE) {
return ret;
}
ret = get_queue_manage_by_qid(dev_id, qid, &que_manage);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_WARN("can not get_queue_manage_by_qid. (dev_id=%u; qid=%u; ret=%d)\n", dev_id, qid, ret);
return ret;
}
if (que_manage->valid != QUEUE_CREATED) {
QUEUE_LOG_WARN("queue is not created.(qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
switch (query_item) {
case QUERY_QUEUE_DROP_STAT:
((QUEUE_DROP_PKT_STAT *)data)->enqueDropCnt = que_manage->stat.enque_drop;
((QUEUE_DROP_PKT_STAT *)data)->dequeDropCnt = que_manage->stat.deque_drop;
break;
case QUERY_QUEUE_STATUS:
if (is_queue_full(que_manage, qid)) {
*((int *)data) = QUEUE_FULL;
} else {
*((int *)data) = (int)(is_queue_empty(que_manage) ?
QUEUE_EMPTY : QUEUE_NORMAL);
}
break;
case QUERY_QUEUE_DEPTH:
*((unsigned int *)data) = queue_get_local_depth(qid);
break;
default:
QUEUE_LOG_WARN("query_item not support now. (query_item=%u)\n", (unsigned int)query_item);
return DRV_ERROR_PARA_ERROR;
}
return DRV_ERROR_NONE;
}
STATIC drvError_t queue_get_qid_by_name_local(unsigned int dev_id, const char *name, unsigned int *qid)
{
struct queue_manages *que_manage = NULL;
unsigned long len;
unsigned int i;
if ((queue_device_invalid(dev_id)) || (name == NULL) || (qid == NULL)) {
QUEUE_LOG_ERR("para invalid. (dev_id=%u)\n", dev_id);
return DRV_ERROR_INVALID_VALUE;
}
len = strnlen(name, MAX_STR_LEN);
if ((len >= MAX_STR_LEN) || (len == 0)) {
QUEUE_LOG_ERR("name len is err. (len=%lu; max len=%d)\n", len, MAX_STR_LEN);
return DRV_ERROR_INVALID_VALUE;
}
for (i = 0; i < MAX_SURPORT_QUEUE_NUM; i++) {
que_manage = queue_get_local_mng(i);
if (que_manage == NULL) {
continue;
}
if (que_manage->valid == QUEUE_CREATED) {
if (strncmp(name, que_manage->name, (MAX_STR_LEN - 1)) == 0 && que_manage->dev_id == dev_id) {
*qid = que_manage->id;
return DRV_ERROR_NONE;
}
}
}
QUEUE_LOG_INFO("device can not find queue named. (dev_id=%u; name=%s)\n", dev_id, name);
return DRV_ERROR_NOT_EXIST;
}
STATIC drvError_t queue_get_qidsby_pid_local(unsigned int dev_id, unsigned int pid,
unsigned int max_que_size, QidsOfPid *info)
{
struct queue_manages *que_manage = NULL;
unsigned int *qids = NULL;
uint32 index = 0;
unsigned int i;
if (queue_device_invalid(dev_id) || (pid == 0) || (max_que_size == 0) || (info == NULL)) {
QUEUE_LOG_ERR("devid,pid or max_que_size is invalid. (dev_id=%u; pid=%u; max_que_size=%u)\n",
dev_id, pid, max_que_size);
return DRV_ERROR_INVALID_VALUE;
}
qids = info->qids;
if (qids == NULL) {
QUEUE_LOG_ERR("qids is invalid.\n");
return DRV_ERROR_INVALID_VALUE;
}
for (i = 0; i < MAX_SURPORT_QUEUE_NUM; i++) {
que_manage = queue_get_local_mng(i);
if (que_manage == NULL) {
continue;
}
if ((que_manage->valid == QUEUE_CREATED) && ((unsigned int)que_manage->creator_pid == pid) &&
(que_manage->dev_id == dev_id)) {
if (index >= max_que_size) {
QUEUE_LOG_ERR("max_que_size is not enough. (index=%d; max_que_size=%u)\n", index, max_que_size);
return DRV_ERROR_OVER_LIMIT;
}
qids[index] = que_manage->id;
index++;
info->queNum = index;
}
}
QUEUE_LOG_INFO("devid pid find queue num. (dev_id=%u; pid=%u; que_num=%u)\n", dev_id, pid, info->queNum);
return DRV_ERROR_NONE;
}
static drvError_t query_the_que_perm_info(unsigned int dev_id, QueueQueryInputPara *in_put, QueueQueryOutputPara *out_put)
{
QueueQueryOutput *out_buff = (QueueQueryOutput *)(out_put->outBuff);
QueueQueryInput *in_buff = (QueueQueryInput *)(in_put->inBuff);
QueueShareAttr attr = { .manage = 1, .read = 1, .write = 1 };
struct queue_manages *que_manage = NULL;
unsigned int qid;
if ((in_buff == NULL) || (in_put->inLen < sizeof(QueQueryQueueAttr)) ||
(out_put->outLen < sizeof(QueQueryQueueAttrInfo))) {
QUEUE_LOG_ERR("Input para error. (in_buff=%p; in_len=%u; out_len=%u)\n", in_buff, in_put->inLen, out_put->outLen);
return DRV_ERROR_INVALID_VALUE;
}
qid = (unsigned int)in_buff->queQueryQueueAttr.qid;
if (qid >= MAX_SURPORT_QUEUE_NUM) {
QUEUE_LOG_ERR("Input para error. (qid=%u)\n", qid);
return DRV_ERROR_INVALID_VALUE;
}
que_manage = queue_get_local_mng(qid);
if ((que_manage == NULL) || (que_manage->valid != QUEUE_CREATED) || (que_manage->dev_id != dev_id)) {
(void)memset_s(&out_buff->queQueryQueueAttrInfo.attr, sizeof(QueueShareAttr), 0, sizeof(QueueShareAttr));
} else {
out_buff->queQueryQueueAttrInfo.attr = attr;
}
out_put->outLen = (unsigned int)sizeof(QueQueryQueueAttrInfo);
return DRV_ERROR_NONE;
}
static drvError_t query_ques_perm_info(unsigned int dev_id, QueueQueryInputPara *in_put, QueueQueryOutputPara *out_put)
{
(void)in_put;
QueueQueryOutput *out_buff = (QueueQueryOutput *)(out_put->outBuff);
unsigned int max_num = out_put->outLen / (unsigned int)sizeof(QueQueryQuesOfProcInfo);
QueueShareAttr attr = { .manage = 1, .read = 1, .write = 1 };
struct queue_manages *que_manage = NULL;
unsigned int cnt = 0;
unsigned int qid;
for (qid = 0; qid < MAX_SURPORT_QUEUE_NUM; qid++) {
que_manage = queue_get_local_mng(qid);
if ((que_manage == NULL) || (que_manage->valid != QUEUE_CREATED) || (que_manage->dev_id != dev_id)) {
continue;
}
if (cnt >= max_num) {
QUEUE_LOG_ERR("output buff not enough. (cnt=%u; max_num=%u; len=%u)\n",
cnt, max_num, out_put->outLen);
return DRV_ERROR_INVALID_VALUE;
}
out_buff->queQueryQuesOfProcInfo[cnt].qid = (int)qid;
out_buff->queQueryQuesOfProcInfo[cnt].attr = attr;
cnt++;
}
out_put->outLen = cnt * (unsigned int)sizeof(QueQueryQuesOfProcInfo);
QUEUE_RUN_LOG_INFO("query_ques_perm_info. (dev_id=%u; cnt=%u)\n", dev_id, cnt);
return DRV_ERROR_NONE;
}
static drvError_t query_queue_mbuf_info(unsigned int dev_id, QueueQueryInputPara *in_put, QueueQueryOutputPara *out_put)
{
QueueQueryOutput *out_buff = (QueueQueryOutput *)(out_put->outBuff);
QueueQueryInput *in_buff = (QueueQueryInput *)(in_put->inBuff);
Mbuf *mbuf = NULL;
unsigned int qid;
drvError_t ret;
if ((in_buff == NULL) || (in_put->inLen < sizeof(QueQueryQueueMbuf)) ||
(out_put->outLen < sizeof(QueQueryQueueMbufInfo))) {
QUEUE_LOG_ERR("Input para error. (in_buff=%p; in_len=%u; out_len=%u)\n", in_buff, in_put->inLen, out_put->outLen);
return DRV_ERROR_INVALID_VALUE;
}
qid = in_buff->queQueryQueueMbuf.qid;
if (qid >= MAX_SURPORT_QUEUE_NUM) {
QUEUE_LOG_ERR("Input para error. (qid=%u)\n", qid);
return DRV_ERROR_INVALID_VALUE;
}
ret = queue_front(dev_id, qid, &mbuf);
if (ret != 0) {
return ret;
}
out_buff->queQueryQueueMbufInfo.timestamp = mbuf_get_timestamp(mbuf);
out_put->outLen = (unsigned int)sizeof(QueQueryQueueMbufInfo);
queue_un_front(mbuf);
return ret;
}
static drvError_t query_queue_max_iovec_num(unsigned int dev_id, QueueQueryInputPara *in_put, QueueQueryOutputPara *out_put)
{
(void)dev_id;
(void)in_put;
QueueQueryOutput *out_buff = (QueueQueryOutput *)(out_put->outBuff);
if (out_put->outLen < sizeof(QueQueryMaxIovecNum)) {
QUEUE_LOG_ERR("Input para error. (out_len=%u)\n", out_put->outLen);
return DRV_ERROR_INVALID_VALUE;
}
out_buff->queQueryMaxIovecNum.count = QUEUE_MAX_IOVEC_NUM;
return DRV_ERROR_NONE;
}
static drvError_t (*g_queue_query[QUEUE_QUERY_CMD_MAX])
(unsigned int dev_id, QueueQueryInputPara *in_put, QueueQueryOutputPara *out_put) = {
[QUEUE_QUERY_QUE_ATTR_OF_CUR_PROC] = query_the_que_perm_info,
[QUEUE_QUERY_QUES_OF_CUR_PROC] = query_ques_perm_info,
[QUEUE_QUERY_QUEUE_MBUF_INFO] = query_queue_mbuf_info,
[QUEUE_QUERY_MAX_IOVEC_NUM] = query_queue_max_iovec_num,
};
STATIC drvError_t queue_query_local(unsigned int dev_id, QueueQueryCmdType cmd,
QueueQueryInputPara *in_put, QueueQueryOutputPara *out_put)
{
if (g_queue_query[cmd] == NULL) {
return DRV_ERROR_NOT_SUPPORT;
}
return g_queue_query[cmd](dev_id, in_put, out_put);
}
static drvError_t queue_peek_data_copy_ref(unsigned int dev_id, unsigned int qid, unsigned int flag, Mbuf **mbuf)
{
Mbuf *priv_mbuf = NULL;
drvError_t ret;
ret = queue_front(dev_id, qid, &priv_mbuf);
if (ret) {
return ret;
}
ret = (drvError_t)halMbufCopyRef(priv_mbuf, mbuf);
if (ret) {
QUEUE_LOG_ERR("mbuf copy failed. (dev_id=%u; qid=%u; flag=%u; ret=%d)\n", dev_id, qid, flag, ret);
}
queue_un_front(priv_mbuf);
return ret;
}
static drvError_t (*g_peek_data[QUEUE_PEEK_DATA_TYPE_MAX])
(unsigned int dev_id, unsigned int qid, unsigned int flag, Mbuf **mbuf) = {
[QUEUE_PEEK_DATA_COPY_REF] = queue_peek_data_copy_ref,
};
drvError_t queue_peek_data_local(unsigned int dev_id, unsigned int qid, unsigned int flag, QueuePeekDataType type,
void **mbuf)
{
if (g_peek_data[type] == NULL) {
QUEUE_LOG_INFO("no support this type. (dev_id=%u; qid=%u; flag=%u; type=%u)\n", dev_id, qid, flag, type);
return DRV_ERROR_NOT_SUPPORT;
}
return g_peek_data[type](dev_id, qid, flag, (Mbuf **)mbuf);
}
static drvError_t queue_set_work_mode_para_check(unsigned int dev_id, QueueSetInput *input)
{
QueueSetWorkMode set_work_mode = input->queSetWorkMode;
if (queue_device_invalid(dev_id)) {
QUEUE_LOG_ERR("para is error. (dev_id=%u)\n", dev_id);
return DRV_ERROR_INVALID_DEVICE;
}
if (((set_work_mode.workMode != QUEUE_MODE_PUSH) && (set_work_mode.workMode != QUEUE_MODE_PULL)) ||
(set_work_mode.qid >= MAX_SURPORT_QUEUE_NUM)) {
QUEUE_LOG_ERR("Input para error. (workMode=%u; qid=%u)\n", set_work_mode.workMode, set_work_mode.qid);
return DRV_ERROR_INVALID_VALUE;
}
return DRV_ERROR_NONE;
}
static drvError_t queue_set_work_mode(unsigned int dev_id, QueueSetInput *input)
{
QueueSetWorkMode set_work_mode = input->queSetWorkMode;
struct queue_manages *manage = NULL;
drvError_t ret;
ret = queue_set_work_mode_para_check(dev_id, input);
if (ret != DRV_ERROR_NONE) {
return ret;
}
manage = queue_get_local_mng(set_work_mode.qid);
if (manage == NULL) {
QUEUE_LOG_ERR("queue local mng is null. (dev_id=%u; qid=%u)\n", dev_id, set_work_mode.qid);
return DRV_ERROR_NOT_EXIST;
}
if (manage->valid != QUEUE_CREATED) {
QUEUE_LOG_ERR("queue is not created. (qid=%u; valid=%d; QUEUE_CREATED=%d)\n",
set_work_mode.qid, manage->valid, QUEUE_CREATED);
return DRV_ERROR_NOT_EXIST;
}
if (set_work_mode.workMode == QUEUE_MODE_PULL && manage->bind_type == QUEUE_TYPE_GROUP) {
QUEUE_LOG_ERR("queue workMode can not be set to QUEUE_MODE_PULL. (qid=%u; workMode=%u; "
"bind_type=%d)\n", set_work_mode.qid, set_work_mode.workMode, manage->bind_type);
return DRV_ERROR_INVALID_VALUE;
}
manage->work_mode = (int)(set_work_mode.workMode);
return DRV_ERROR_NONE;
}
static drvError_t (*g_queue_set[QUEUE_SET_CMD_MAX]) (unsigned int dev_id, QueueSetInput *input) = {
[QUEUE_SET_WORK_MODE] = queue_set_work_mode,
};
drvError_t queue_set_local(unsigned int dev_id, QueueSetCmdType cmd, QueueSetInputPara *input)
{
if ((dev_id >= MAX_DEVICE) || (input == NULL) || (cmd >= QUEUE_SET_CMD_MAX) ||
(g_queue_set[cmd] == NULL)) {
QUEUE_LOG_ERR("Input para error. (dev_id=%u; cmd=%u; input=%p)\n", dev_id, cmd, input);
return DRV_ERROR_INVALID_VALUE;
}
if ((input->inBuff == NULL) || (input->inLen != sizeof(QueueSetInput))) {
QUEUE_LOG_ERR("Input para error. (in_buff=%p; in_len=%u)\n", input->inBuff, input->inLen);
return DRV_ERROR_INVALID_VALUE;
}
return g_queue_set[cmd](dev_id, (QueueSetInput *)input->inBuff);
}
static drvError_t queue_front_para_check(unsigned int dev_id, unsigned int qid)
{
if (queue_device_invalid(dev_id)) {
QUEUE_LOG_ERR("para is error. (dev_id=%u)\n", dev_id);
return DRV_ERROR_INVALID_DEVICE;
}
if (qid >= MAX_SURPORT_QUEUE_NUM) {
QUEUE_LOG_ERR("para is error. (qid=%u)\n", qid);
return DRV_ERROR_INVALID_VALUE;
}
return DRV_ERROR_NONE;
}
drvError_t queue_front(unsigned int dev_id, unsigned int qid, Mbuf **mbuf)
{
union atomic_queue_head cur_head;
struct queue_manages *que_manage = NULL;
queue_entity_node *que_entity = NULL;
void *buff = NULL;
unsigned int depth;
drvError_t ret;
uint32_t blk_id;
uint32_t head;
ret = queue_front_para_check(dev_id, qid);
if (ret != DRV_ERROR_NONE) {
return ret;
}
if (!queue_get(qid)) {
QUEUE_LOG_ERR("queue get failed. (qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
que_manage = queue_get_local_mng(qid);
if (que_manage == NULL) {
queue_put(qid);
QUEUE_LOG_ERR("Queue local manage is null. (qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
if (que_manage->valid != QUEUE_CREATED) {
queue_put(qid);
QUEUE_LOG_ERR("queue is not created. (qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
if (is_queue_empty(que_manage)) {
que_manage->empty_flag = 1;
queue_put(qid);
return DRV_ERROR_QUEUE_EMPTY;
}
queue_get_entity_and_depth(qid, &que_entity, &depth);
if (que_entity == NULL) {
queue_put(qid);
QUEUE_LOG_ERR("queue entity is NULL.(qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
cur_head.head_value = queue_get_head_value(que_manage);
head = ((cur_head.head_info.head) % queue_get_local_depth(qid));
buff = que_entity[head].node;
blk_id = que_entity[head].node_desp.blk_id;
if (buff == NULL) {
que_manage->empty_flag = 1;
queue_put(qid);
return DRV_ERROR_QUEUE_EMPTY;
}
* If the mbuf is illegal, the error code DRV_ERROR_NO_RESOURCES needs to be returned and the data discarded.
* The upper layer will perform selective processing based on the error code.
*/
ret = create_priv_mbuf_for_queue(mbuf, buff, blk_id);
if (ret == DRV_ERROR_NO_RESOURCES) {
union atomic_queue_head new_head;
#ifndef EMU_ST
new_head.head_value = queue_head_inc(cur_head, depth);
(void)CAS(&que_manage->queue_head.head_value, cur_head.head_value, new_head.head_value);
#endif
}
queue_put(qid);
return ret;
}
void queue_un_front(Mbuf *mbuf)
{
destroy_priv_mbuf_for_queue(mbuf);
}
void queue_update_time(unsigned int dev_id, unsigned int qid, unsigned int host_time, unsigned int msg_type)
{
struct queue_manages *que_manage = NULL;
drvError_t ret;
ret = queue_front_para_check(dev_id, qid);
if (ret != DRV_ERROR_NONE) {
return;
}
if (!queue_get(qid)) {
QUEUE_LOG_ERR("queue get failed. (qid=%u)\n", qid);
return;
}
que_manage = queue_get_local_mng(qid);
if (que_manage == NULL) {
queue_put(qid);
QUEUE_LOG_ERR("Queue local manage is null. (qid=%u)\n", qid);
return;
}
if (que_manage->valid != QUEUE_CREATED) {
queue_put(qid);
QUEUE_LOG_ERR("queue is not created. (qid=%u)\n", qid);
return;
}
if (msg_type == DRV_SUBEVENT_DEQUEUE_MSG) {
ATOMIC_SET(&que_manage->stat.last_deque_time.tv_usec, host_time);
} else {
ATOMIC_SET(&que_manage->stat.last_enque_time.tv_usec, host_time);
}
queue_put(qid);
return;
}
drvError_t queue_get_qid_create_time(unsigned int dev_id, unsigned int qid, unsigned long *create_time)
{
struct queue_manages *que_mng = NULL;
drvError_t ret;
ret = get_queue_manage_by_qid(dev_id, qid, &que_mng);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("get queue manage failed. (qid=%u; ret=%d)\n", qid, ret);
return ret;
}
if (!queue_get(qid)) {
QUEUE_LOG_ERR("queue get failed. (qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
if (que_mng->valid != QUEUE_CREATED) {
queue_put(qid);
QUEUE_LOG_ERR("queue is not created. (qid=%u)\n", qid);
return DRV_ERROR_NOT_EXIST;
}
*create_time = que_mng->create_time;
queue_put(qid);
return DRV_ERROR_NONE;
}
bool queue_is_inter_dev(unsigned int dev_id, unsigned int qid)
{
struct queue_manages *que_mng = NULL;
drvError_t ret;
ret = get_queue_manage_by_qid(dev_id, qid, &que_mng);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("get que manage failed. (qid=%u; ret=%d)\n", qid, ret);
return false;
}
if (!queue_get(qid)) {
QUEUE_LOG_ERR("que get failed. (qid=%u)\n", qid);
return false;
}
if (que_mng->valid != QUEUE_CREATED) {
queue_put(qid);
QUEUE_LOG_ERR("que is not created. (qid=%u)\n", qid);
return false;
}
if ((que_mng->inter_dev_state == QUEUE_STATE_IMPORTED) || (que_mng->inter_dev_state == QUEUE_STATE_EXPORTED)) {
queue_put(qid);
return true;
}
queue_put(qid);
return false;
}
STATIC struct queue_comm_interface_list g_core_interface = {
.queue_dc_init = queue_init_local,
.queue_uninit = NULL,
.queue_create = queue_create_local,
.queue_grant = queue_grant_local,
.queue_attach = queue_attach_local,
.queue_destroy = queue_destroy_local,
.queue_reset = queue_reset_local,
.queue_en_queue = queue_enqueue_local,
.queue_de_queue = queue_dequeue_local,
.queue_subscribe = queue_subscribe_local,
.queue_unsubscribe = queue_unsubscribe_local,
.queue_sub_f_to_nf_event = queue_sub_f_to_nf_event_local,
.queue_unsub_f_to_nf_event = queue_unsub_f_to_nf_event_local,
.queue_sub_event = queue_sub_event_local,
.queue_unsub_event = queue_unsub_event_local,
.queue_ctrl_event = queue_ctrl_event_local,
.queue_query_info = queue_query_info_local,
.queue_get_status = queue_get_status_local,
.queue_get_qid_by_name = queue_get_qid_by_name_local,
.queue_get_qids_by_pid = queue_get_qidsby_pid_local,
.queue_query = queue_query_local,
.queue_peek_data = queue_peek_data_local,
.queue_set = queue_set_local,
.queue_finish_cb = queue_finish_callback_local,
.queue_export = NULL,
.queue_unexport = NULL,
.queue_import = NULL,
.queue_unimport = NULL,
};
STATIC int __attribute__((constructor)) queue_core_init(void)
{
queue_set_comm_interface(LOCAL_QUEUE, &g_core_interface);
return 0;
}