* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include <stdlib.h>
#include <pthread.h>
#include "securec.h"
#include "ascend_hal_error.h"
#ifndef EMU_ST
#include "svm_user_interface.h"
#endif
#include "uref.h"
#include "queue_interface.h"
#include "queue.h"
#include "que_compiler.h"
#include "que_urma.h"
#include "que_jetty.h"
#include "que_ub_msg.h"
#include "que_comm_event.h"
#include "que_comm_thread.h"
#include "que_comm_chan.h"
#include "dms_user_interface.h"
#include "que_comm_ctx.h"
#ifndef EMU_ST
#define QUE_PKT_RECV_DEPTH 4096U
#define QUE_ACK_SEND_DEPTH 4096U
#define QUE_URMA_WAIT_TIME 500
static void que_jfs_pool_free(int qjfs_num, struct que_jfs_pool_info *jfs_pool)
{
int idx;
for (idx = 0; idx < qjfs_num; idx++) {
if (que_likely(jfs_pool[idx].qjfs != NULL)) {
que_jfs_destroy(jfs_pool[idx].qjfs);
jfs_pool[idx].qjfs = NULL;
}
}
}
static int que_jfs_pool_create(unsigned int devid, struct que_jfs_pool_info *jfs_pool, unsigned int d2d_flag)
{
int idx;
struct que_jfs_attr jfs_attr = {.jfs_depth = QUE_SEND_PKT_DEPTH, .jfc_s_depth = QUE_SEND_PKT_DEPTH, .priority = QUE_JFS_MEDIUM_PRIORITY};
for (idx = 0; idx < QUE_PKT_SEND_JETTY_POOL_DEPTH; idx++) {
jfs_pool[idx].qjfs = que_jfs_create(devid, &jfs_attr, d2d_flag);
if (que_unlikely(jfs_pool[idx].qjfs == NULL)) {
que_jfs_pool_free(idx, jfs_pool);
return DRV_ERROR_INNER_ERR;
}
jfs_pool[idx].jfs_busy_flag = false;
}
return DRV_ERROR_NONE;
}
static void que_jfr_pool_free(int qjfr_num, struct que_jfr_pool_info *jfr_pool)
{
int idx;
for (idx = 0; idx < qjfr_num; idx++) {
if (que_likely(jfr_pool[idx].qjfr != NULL)) {
que_jfr_destroy(jfr_pool[idx].qjfr);
jfr_pool[idx].qjfr = NULL;
}
}
}
static int que_jfr_pool_create(unsigned int devid, struct que_jfr_pool_info *jfr_pool, unsigned int d2d_flag)
{
int idx;
struct que_jfr_attr jfr_attr = {.jfr_depth = QUE_ACK_PKT_DEPTH, .jfc_r_depth = QUE_ACK_PKT_DEPTH};
for (idx = 0; idx < QUE_PKT_SEND_JETTY_POOL_DEPTH; idx++) {
jfr_pool[idx].qjfr = que_jfr_create(devid, &jfr_attr, d2d_flag);
if (que_unlikely(jfr_pool[idx].qjfr == NULL)) {
que_jfr_pool_free(idx, jfr_pool);
return DRV_ERROR_INNER_ERR;
}
jfr_pool[idx].jfr_busy_flag = false;
}
return DRV_ERROR_NONE;
}
static void que_uma_recv_destroy_pool(int recv_num, struct que_jfr_pool_info *jfr_pool)
{
int idx;
for (idx = 0; idx < recv_num; idx++) {
if (que_likely(jfr_pool[idx].recv_para != NULL)) {
que_uma_recv_destroy(jfr_pool[idx].recv_para);
jfr_pool[idx].recv_para = NULL;
}
}
}
static int que_uma_recv_create_pool(unsigned int devid, struct que_jfr_pool_info *jfr_pool, unsigned int d2d_flag)
{
int idx;
struct que_uma_recv_attr recv_attr = {.num = QUE_ACK_PKT_DEPTH, .size = QUE_ACK_PKT_SIZE};
for (idx = 0; idx < QUE_PKT_SEND_JETTY_POOL_DEPTH; idx++) {
jfr_pool[idx].recv_para = que_uma_recv_create(devid, jfr_pool[idx].qjfr, &recv_attr, d2d_flag);
if (que_unlikely(jfr_pool[idx].recv_para == NULL)) {
que_uma_recv_destroy_pool(idx, jfr_pool);
}
}
return DRV_ERROR_NONE;
}
static int que_jfs_pool_create_for_pkt_send(struct que_ctx *ctx, unsigned int d2d_flag)
{
int ret;
struct que_jfs_pool_info jfs_pool[QUE_PKT_SEND_JETTY_POOL_DEPTH] = {0};
ret = que_jfs_pool_create(ctx->devid, jfs_pool, d2d_flag);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que jfs pool create fail. (ret=%d; devid=%u; d2d_flag=%u)\n", ret, ctx->devid, d2d_flag);
return ret;
}
ret = memcpy_s((void *)ctx->jfs_pool[d2d_flag], sizeof(struct que_jfs_pool_info) * QUE_PKT_SEND_JETTY_POOL_DEPTH,
(void *)jfs_pool, sizeof(struct que_jfs_pool_info) * QUE_PKT_SEND_JETTY_POOL_DEPTH);
if (que_unlikely(ret != EOK)) {
QUEUE_LOG_ERR("Que jfs pool copy fail. (ret=%d; devid=%u; d2d_flag=%u)\n", ret, ctx->devid, d2d_flag);
ret = DRV_ERROR_INNER_ERR;
goto jfs_pool_free;
}
return DRV_ERROR_NONE;
jfs_pool_free:
que_jfs_pool_free(QUE_PKT_SEND_JETTY_POOL_DEPTH, jfs_pool);
return ret;
}
static void que_jfs_pool_free_for_pkt_send(struct que_ctx *ctx, unsigned int d2d_flag)
{
que_jfs_pool_free(QUE_PKT_SEND_JETTY_POOL_DEPTH, ctx->jfs_pool[d2d_flag]);
}
static int que_jfr_pool_create_for_ack_recv(struct que_ctx *ctx, unsigned int d2d_flag)
{
int ret;
struct que_jfr_pool_info jfr_pool[QUE_PKT_SEND_JETTY_POOL_DEPTH] = {0};
ret = que_jfr_pool_create(ctx->devid, jfr_pool, d2d_flag);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("Que jfr pool create fail. (ret=%d; devid=%u)\n", ret, ctx->devid);
return ret;
}
ret = que_uma_recv_create_pool(ctx->devid, jfr_pool, d2d_flag);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que uma recv create fail. (ret=%d; devid=%u)\n", ret, ctx->devid);
goto jfr_pool_free;
}
ret = memcpy_s((void *)ctx->jfr_pool[d2d_flag], sizeof(struct que_jfr_pool_info) * QUE_PKT_SEND_JETTY_POOL_DEPTH,
(void *)jfr_pool, sizeof(struct que_jfr_pool_info) * QUE_PKT_SEND_JETTY_POOL_DEPTH);
if (que_unlikely(ret != EOK)) {
QUEUE_LOG_ERR("que jfr pool copy fail. (ret=%d; devid=%u)\n", ret, ctx->devid);
ret = DRV_ERROR_INNER_ERR;
goto uma_recv_free;
}
return DRV_ERROR_NONE;
uma_recv_free:
que_uma_recv_destroy_pool(QUE_PKT_SEND_JETTY_POOL_DEPTH, jfr_pool);
jfr_pool_free:
que_jfr_pool_free(QUE_PKT_SEND_JETTY_POOL_DEPTH, jfr_pool);
return ret;
}
static void que_jfr_pool_free_for_ack_recv(struct que_ctx *ctx, unsigned int d2d_flag)
{
que_uma_recv_destroy_pool(QUE_PKT_SEND_JETTY_POOL_DEPTH, ctx->jfr_pool[d2d_flag]);
que_jfr_pool_free(QUE_PKT_SEND_JETTY_POOL_DEPTH, ctx->jfr_pool[d2d_flag]);
}
static int que_jfs_jfr_pool_create_for_pkt_send_ack_recv(struct que_ctx *ctx, unsigned int d2d_flag)
{
int ret;
ret = que_jfs_pool_create_for_pkt_send(ctx, d2d_flag);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
return ret;
}
ret = que_jfr_pool_create_for_ack_recv(ctx, d2d_flag);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
que_jfs_pool_free_for_pkt_send(ctx, d2d_flag);
return ret;
}
return DRV_ERROR_NONE;
}
static void que_jfs_jfr_pool_free_for_pkt_send_ack_recv(struct que_ctx *ctx, unsigned int d2d_flag)
{
que_jfs_pool_free_for_pkt_send(ctx, d2d_flag);
que_jfr_pool_free_for_ack_recv(ctx, d2d_flag);
}
static int que_jfr_jfs_create_for_pkt_recv_ack_send(struct que_ctx *ctx, unsigned int d2d_flag)
{
struct que_jfs *ack_send_jetty = NULL;
struct que_jfr *pkt_recv_jetty = NULL;
struct que_recv_para *recv = NULL;
struct que_jfr_attr jfr_attr = {.jfr_depth = QUE_PKT_RECV_DEPTH, .jfc_r_depth = QUE_PKT_RECV_DEPTH};
struct que_jfs_attr jfs_attr = {.jfs_depth = QUE_ACK_SEND_DEPTH, .jfc_s_depth = QUE_ACK_SEND_DEPTH, .priority = QUE_JFS_HIGH_PRIORITY};
struct que_uma_recv_attr uma_recv_attr = {.num = QUE_PKT_RECV_DEPTH, .size = QUE_UMA_MAX_SEND_SIZE};
pkt_recv_jetty = que_jfr_create(ctx->devid, &jfr_attr,d2d_flag);
if (que_unlikely(pkt_recv_jetty == NULL)) {
QUEUE_LOG_ERR("que jfr create fail. (devid=%u; hospid=%d)\n", ctx->devid, ctx->hostpid);
return DRV_ERROR_INNER_ERR;
}
ack_send_jetty = que_jfs_create(ctx->devid, &jfs_attr, d2d_flag);
if (que_unlikely(ack_send_jetty == NULL)) {
QUEUE_LOG_ERR("Que jfs create fail. (devid=%u; hospid=%d)\n", ctx->devid, ctx->hostpid);
goto pkt_recv_jetty_free;
}
recv = que_uma_recv_create(ctx->devid, pkt_recv_jetty, &uma_recv_attr, d2d_flag);
if (que_unlikely(recv == NULL)) {
QUEUE_LOG_ERR("que pkt recv buff init fail. (devid=%u; hostpid=%d)\n", ctx->devid, ctx->hostpid);
goto ack_send_jetty_free;
}
ctx->ack_send_jetty[d2d_flag] = ack_send_jetty;
ctx->pkt_recv_jetty[d2d_flag] = pkt_recv_jetty;
ctx->recv_para[d2d_flag] = recv;
return DRV_ERROR_NONE;
ack_send_jetty_free:
que_jfs_destroy(ack_send_jetty);
pkt_recv_jetty_free:
que_jfr_destroy(pkt_recv_jetty);
return DRV_ERROR_INNER_ERR;
}
static void que_jfr_jfs_free_for_pkt_recv_ack_send(struct que_ctx *ctx, unsigned int d2d_flag)
{
if (que_likely(ctx->recv_para[d2d_flag] != NULL)) {
que_uma_recv_destroy(ctx->recv_para[d2d_flag]);
ctx->recv_para[d2d_flag] = NULL;
}
if (que_likely(ctx->ack_send_jetty[d2d_flag] != NULL)) {
que_jfs_destroy(ctx->ack_send_jetty[d2d_flag]);
ctx->ack_send_jetty[d2d_flag] = NULL;
}
if (que_likely(ctx->pkt_recv_jetty[d2d_flag] != NULL)) {
que_jfr_destroy(ctx->pkt_recv_jetty[d2d_flag]);
ctx->pkt_recv_jetty[d2d_flag] = NULL;
}
}
static int que_jfs_pool_create_for_data_rw(struct que_ctx *ctx, unsigned int d2d_flag)
{
int ret;
struct que_jfs_attr attr = {.jfs_depth = QUE_DATA_RW_JETTY_POOL_SEND_DEPTH,
.jfc_s_depth = QUE_DATA_RW_JETTY_POOL_JFC_DEPTH, .priority = QUE_JFS_MEDIUM_PRIORITY};
attr.spec_jfce_s = 1;
attr.jfce = ctx->pkt_recv_jetty[d2d_flag]->jfce_r;
ret = que_jfs_pool_init(ctx->devid, &attr, d2d_flag);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que data rw jetty pool init fail. (ret=%d; devid=%u; hostpid=%d)\n", ret, ctx->devid, ctx->hostpid);
return ret;
}
ctx->jfc[d2d_flag] = attr.jfc;
return DRV_ERROR_NONE;
}
static void que_jfs_pool_free_for_data_rw(struct que_ctx *ctx, unsigned int d2d_flag)
{
ctx->jfc[d2d_flag] = NULL;
que_jfs_pool_uninit(ctx->devid, d2d_flag);
}
static int que_ctx_cr_token_init(struct que_ctx *ctx, unsigned int d2d_flag)
{
int ret;
ret = que_fill_ctx_token(ctx->devid, &ctx->token[d2d_flag], d2d_flag);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("que fill ctx token fail. (ret=%d; devid=%u)\n", ret, ctx->devid);
return ret;
}
urma_cr_t *cr = calloc(QUE_DATA_RW_JETTY_POOL_DEPTH, sizeof(urma_cr_t));
if (cr == NULL) {
QUEUE_LOG_ERR("que malloc cr fail. (cr_num=%u)\n", QUE_DATA_RW_JETTY_POOL_DEPTH);
return DRV_ERROR_OUT_OF_MEMORY;
}
ctx->cr[d2d_flag] = cr;
return DRV_ERROR_NONE;
}
static void que_ctx_cr_token_uninit(struct que_ctx *ctx, unsigned int d2d_flag)
{
if (que_likely(ctx->cr[d2d_flag] != NULL)) {
free(ctx->cr[d2d_flag]);
ctx->cr[d2d_flag] = NULL;
}
ctx->token[d2d_flag].token = 0;
}
static int que_ctx_event_res_init(struct que_ctx *ctx, unsigned int d2d_flag)
{
int ret;
struct event_res res;
int32_t event_type;
if (ctx->f2nf_res.res_alloc_flag == 1) {
return DRV_ERROR_NONE;
}
que_get_sched_event_type(ctx->devid, &event_type);
ret = esched_alloc_event_res(ctx->devid, event_type, &res);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_WARN("event resource can not alloc. (ret=%d; devid=%u; event_type=%u)\n", ret, ctx->devid, event_type);
return DRV_ERROR_NO_EVENT_RESOURCES;
}
ctx->f2nf_res.f2nf_event_res = res;
ctx->f2nf_res.pid = (uint32_t)getpid();
ctx->f2nf_res.dst_engine = que_get_sched_engine_type(ctx->devid);
ctx->f2nf_res.res_alloc_flag = 1;
return DRV_ERROR_NONE;
}
static void que_ctx_event_res_uninit(struct que_ctx *ctx, unsigned int d2d_flag)
{
if (que_likely(ctx->f2nf_res.res_alloc_flag == 1)) {
esched_free_event_res(ctx->devid, QUEUE_EVENT, &ctx->f2nf_res.f2nf_event_res);
ctx->f2nf_res.res_alloc_flag = 0;
}
}
#define MAX_RES_TYPE 5
typedef int (*que_ub_res_alloc)(struct que_ctx *ctx, unsigned int d2d_flag);
typedef void (*que_ub_res_free)(struct que_ctx *ctx, unsigned int d2d_flag);
que_ub_res_alloc g_ub_res_init[MAX_RES_TYPE] = {
que_jfs_jfr_pool_create_for_pkt_send_ack_recv,
que_jfr_jfs_create_for_pkt_recv_ack_send,
que_jfs_pool_create_for_data_rw,
que_ctx_cr_token_init,
que_ctx_event_res_init,
};
que_ub_res_free g_ub_res_uninit[MAX_RES_TYPE] = {
que_jfs_jfr_pool_free_for_pkt_send_ack_recv,
que_jfr_jfs_free_for_pkt_recv_ack_send,
que_jfs_pool_free_for_data_rw,
que_ctx_cr_token_uninit,
que_ctx_event_res_uninit,
};
static int que_ub_ctx_res_init(struct que_ctx *ctx, int res_type)
{
int ret;
ret = g_ub_res_init[res_type](ctx, TRANS_D2H_H2D);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que ub ctx res init h2d d2h failed. (ret=%d; res_type=%d)\n", ret, res_type);
return ret;
}
#ifndef DRV_HOST
ret = g_ub_res_init[res_type](ctx, TRANS_D2D);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que ub ctx res init d2d failed. (ret=%d; res_type=%d)\n", ret, res_type);
g_ub_res_uninit[res_type](ctx, TRANS_D2H_H2D);
return ret;
}
#endif
return DRV_ERROR_NONE;
}
static void que_ub_ctx_res_uninit(struct que_ctx *ctx, int res_type)
{
g_ub_res_uninit[res_type](ctx, TRANS_D2H_H2D);
#ifndef DRV_HOST
g_ub_res_uninit[res_type](ctx, TRANS_D2D);
#endif
}
int que_ctx_init(struct que_ctx *ctx)
{
int ret;
int res_type, res_type_free;
for (res_type = 0; res_type < MAX_RES_TYPE; res_type++) {
if (((ctx->res_bitmap >> res_type) & 0x1) == 0) {
continue;
}
ret = que_ub_ctx_res_init(ctx, res_type);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
goto out;
}
}
return DRV_ERROR_NONE;
out:
for (res_type_free = (res_type - 1); res_type_free >= 0; res_type_free--) {
if ((ctx->res_bitmap >> res_type_free) & 0x1) {
que_ub_ctx_res_uninit(ctx, res_type_free);
}
}
return ret;
}
void que_ctx_uninit(struct que_ctx *ctx)
{
int res_type;
if (que_likely(ctx != NULL)) {
for (res_type = (MAX_RES_TYPE - 1); res_type >= 0; res_type--) {
if ((ctx->res_bitmap >> res_type) & 0x1) {
que_ub_ctx_res_uninit(ctx, res_type);
}
}
}
}
int que_ctx_chan_check(unsigned int devid, unsigned int qid, unsigned long create_time)
{
struct que_ctx *ctx = NULL;
int ret;
ctx = que_ctx_get(devid);
if (que_unlikely(ctx == NULL)) {
return DRV_ERROR_QUEUE_NOT_INIT;
}
ret = que_chan_create_check(devid, qid, create_time);
que_ctx_put(ctx);
return ret;
}
int que_ctx_chan_create(unsigned int devid, unsigned int qid, QUEUE_CHAN_TYPE chan_type, unsigned long create_time, unsigned int d2d_flag)
{
struct que_ctx *ctx = NULL;
int ret;
ctx = que_ctx_get(devid);
if (que_unlikely(ctx == NULL)) {
return DRV_ERROR_QUEUE_NOT_INIT;
}
ret = que_chan_create(devid, qid, chan_type, create_time, d2d_flag);
que_ctx_put(ctx);
return ret;
}
int que_ctx_chan_update(unsigned int devid, unsigned int peer_devid, unsigned int qid, urma_jfr_id_t *tjfr_id, urma_token_t *token)
{
int ret;
unsigned int d2d_flag = 0;
struct que_ctx *ctx = NULL;
unsigned int urma_devid = que_get_urma_devid(devid, peer_devid);
que_get_d2d_flag(devid, peer_devid, &d2d_flag);
ret = que_ub_res_init(urma_devid);
if (que_unlikely((ret != DRV_ERROR_NONE) && (ret != DRV_ERROR_REPEATED_INIT))) {
QUEUE_LOG_ERR("que clt ub res init fail. (ret=%d; devid=%u)\n", ret, urma_devid);
return ret;
}
ctx = que_ctx_get(urma_devid);
if (que_unlikely(ctx == NULL)) {
return DRV_ERROR_QUEUE_NOT_INIT;
}
*token = ctx->token[d2d_flag];
ret = que_chan_update_jfs_info(devid, qid, ctx->jfs_pool[d2d_flag], ctx->pkt_recv_jetty[d2d_flag], tjfr_id, ctx->token[d2d_flag]);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que chan update fail. (ret=%d; devid=%u; qid=%u; devpid=%d)\n", ret, devid, qid, ctx->devpid);
}
que_ctx_put(ctx);
return ret;
}
int que_ctx_get_f2nf_res(unsigned int devid, unsigned int qid, struct que_f2nf_res *f2nf_res)
{
struct que_ctx *ctx = NULL;
ctx = que_ctx_get(devid);
if (que_unlikely(ctx == NULL)) {
return DRV_ERROR_QUEUE_NOT_INIT;
}
*f2nf_res = ctx->f2nf_res;
que_ctx_put(ctx);
return DRV_ERROR_NONE;
}
int que_ctx_chan_destroy(unsigned int devid, unsigned int qid)
{
struct que_ctx *ctx = NULL;
int ret;
ctx = que_ctx_get(devid);
if (que_unlikely(ctx == NULL)) {
return DRV_ERROR_QUEUE_NOT_INIT;
}
ret = que_chan_destroy(devid, qid);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que chan destroy fail. (ret=%d; devid=%u; qid=%u; devpid=%d)\n",
ret, devid, qid, ctx->devpid);
}
que_ctx_put(ctx);
return ret;
}
static int que_get_running_chan_list(unsigned int devid, struct que_query_alive_msg *qid_list)
{
int i;
int que_num = 0;
static unsigned int pre_check_qid_id = 0;
struct que_chan *chan = NULL;
if (pre_check_qid_id >= CLIENT_QID_OFFSET) {
pre_check_qid_id = 0;
}
for (i = pre_check_qid_id; i < CLIENT_QID_OFFSET; i++) {
chan = que_chan_get(devid, i);
if (que_unlikely(chan == NULL)) {
continue;
}
if (chan->chan_type != CHAN_ATTACH) {
que_chan_put(chan);
continue;
}
if (que_num >= QUE_MAX_QUE_LIST_NUM) {
que_chan_put(chan);
break;
}
qid_list->qid_list[que_num].qid = i;
qid_list->qid_list[que_num].alive = 1;
que_chan_put(chan);
que_num++;
}
pre_check_qid_id = i;
if (que_num == 0) {
return DRV_ERROR_QUEUE_EMPTY;
}
qid_list->num = que_num;
return DRV_ERROR_NONE;
}
void que_ctx_chan_recycle(unsigned int devid, struct que_query_alive_msg *qid_list)
{
int ret;
unsigned int qid, i;
qid_list->num =0;
ret = que_get_running_chan_list(devid, qid_list);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
return;
}
ret = que_clt_query_que_alive(devid, qid_list);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
return;
}
for (i = 0; i < qid_list->num; i++) {
if (qid_list->qid_list[i].alive) {
continue;
}
qid = qid_list->qid_list[i].qid;
if (qid >= CLIENT_QID_OFFSET) {
continue;
}
ret = que_ctx_chan_destroy(devid, qid);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_RUN_LOG_INFO("que destroy chan not success. (ret=%d; devid=%u; qid=%u)\n", ret, devid, qid);
}
}
return;
}
static ASYNC_QUE_INI_EVENT que_get_enque_event(int ret)
{
ASYNC_QUE_INI_EVENT event;
if (ret == DRV_ERROR_QUEUE_EMPTY) {
event = INI_ENQUE_EMPTY;
} else if (ret == DRV_ERROR_NONE) {
event = INI_ENQUE_NORMAL;
} else {
event = INI_ENQUE_ERROR;
}
return event;
}
int que_ctx_async_ini(unsigned int devid, unsigned int qid, void *mbuf)
{
struct que_ctx *ctx = NULL;
ASYNC_QUE_INI_EVENT event;
bool ini_try_flag;
int ret;
ctx = que_ctx_get(devid);
if (que_unlikely(ctx == NULL)) {
return DRV_ERROR_QUEUE_NOT_INIT;
}
ret = que_chan_async_pre_proc(devid, qid, mbuf);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
que_ctx_put(ctx);
return ret;
}
ini_try_flag = que_chan_update_ini_status(devid, qid, INI_ENQUE_TRY);
if (ini_try_flag) {
ret = que_chan_inter_dev_ini_proc(devid, qid);
event = que_get_enque_event(ret);
(void)que_chan_update_ini_status(devid, qid, event);
}
que_ctx_put(ctx);
return ret;
}
static void que_ctx_ack_recv(unsigned int urma_devid, urma_cr_t *cr)
{
int result, ret;
unsigned int qid, ini_try_flag;
ASYNC_QUE_INI_EVENT event;
que_ack_data ack_data = {0};
unsigned int devid = que_get_chan_devid(urma_devid);
ack_data.imm_data = cr->imm_data;
result = ack_data.ack_msg.result;
qid = queue_get_actual_qid(ack_data.ack_msg.qid);
que_chan_done(devid, qid, ASYNC_ENQUE);
if ((cr->status != URMA_CR_SUCCESS) || ((result != DRV_ERROR_NONE) && (result != DRV_ERROR_QUEUE_FULL))) {
QUEUE_LOG_ERR("que enque ack error. (cr_status=%d; ret=%d; devid=%u; urma_devid=%u; qid=%u; immdata=%llu)\n",
cr->status, result, devid, urma_devid, qid, ack_data.imm_data);
event = INI_ACK_ERROR;
} else if (result == DRV_ERROR_QUEUE_FULL) {
event = INI_ACK_FULL;
} else {
event = INI_ACK_NORMAL;
}
ini_try_flag = que_chan_update_ini_status(devid, qid, event);
if (ini_try_flag) {
ret = que_chan_inter_dev_ini_proc(devid, qid);
event = que_get_enque_event(ret);
(void)que_chan_update_ini_status(devid, qid, event);
}
}
static unsigned long _que_uma_recv_get_addr(struct que_ctx *ctx, unsigned int offset, unsigned int d2d_flag)
{
return ctx->recv_para[d2d_flag]->addr + offset * ctx->recv_para[d2d_flag]->size;
}
static int que_ctx_recv_proc(struct que_ctx *ctx, urma_cr_t *cr, unsigned int d2d_flag)
{
int ret = DRV_ERROR_NONE;
struct que_pkt *pkt = NULL;
if (que_unlikely((cr->user_ctx >= ctx->recv_para[d2d_flag]->num))) {
QUEUE_LOG_ERR("invalid offset. (offset=%u; num=%u; recv_size=%ld)\n",
cr->user_ctx, ctx->recv_para[d2d_flag]->num, ctx->recv_para[d2d_flag]->size);
return DRV_ERROR_PARA_ERROR;
}
pkt = (struct que_pkt *)_que_uma_recv_get_addr(ctx, cr->user_ctx, d2d_flag);
if (cr->opcode == URMA_CR_OPC_SEND_WITH_IMM) {
que_ctx_ack_recv(ctx->devid, cr);
} else {
if (que_unlikely(cr->status != URMA_CR_SUCCESS)) {
QUEUE_LOG_ERR("invalid cr status. (status=%d)\n", cr->status);
goto recv_put;
}
ret = que_chan_tgt_recv(ctx->devid, ctx->ack_send_jetty[d2d_flag], pkt, d2d_flag);
}
recv_put:
que_uma_recv_put_addr(ctx->pkt_recv_jetty[d2d_flag], ctx->recv_para[d2d_flag], (uintptr_t)pkt);
return ret;
}
typedef enum {
JFC_DATA_RW,
JFC_PKT_RECV,
JFC_TYPE_BUTT,
} jfc_type;
static jfc_type que_jfc_id_check(urma_jfc_t *jfc_data_rw, urma_jfc_t *jfc_pkt_recv, urma_jfc_t *jfc)
{
urma_jfc_id_t jfc_id_data_rw = jfc_data_rw->jfc_id;
urma_jfc_id_t jfc_id_pkt_recv = jfc_pkt_recv->jfc_id;
urma_jfc_id_t jfc_id = jfc->jfc_id;
if ((jfc_id_data_rw.id == jfc_id.id) && (jfc_id_data_rw.uasid == jfc_id.uasid)) {
return JFC_DATA_RW;
}
if ((jfc_id_pkt_recv.id == jfc_id.id) && (jfc_id_pkt_recv.uasid == jfc_id.uasid)) {
return JFC_PKT_RECV;
}
return JFC_TYPE_BUTT;
}
static int que_ctx_tgt_proc(struct que_ctx *ctx, urma_jfc_t *jfc, unsigned int d2d_flag)
{
int ret;
jfc_type cur_jfc_type;
unsigned int idle_jetty_num;
unsigned int cnt, cr_idx, cr_num;
static unsigned int next_rearm_flag = 0;
unsigned int cur_rearm_flag = 1;
urma_cr_t *cr = ctx->cr[d2d_flag];
cur_jfc_type = que_jfc_id_check(ctx->jfc[d2d_flag], ctx->pkt_recv_jetty[d2d_flag]->jfc_r, jfc);
if (cur_jfc_type == JFC_TYPE_BUTT) {
return DRV_ERROR_INVALID_VALUE;
}
QUEUE_LOG_DEBUG("que rcv new jfc. (devid=%u; cur_jfc_type=%d)\n", ctx->devid, cur_jfc_type);
if (cur_jfc_type == JFC_DATA_RW) {
ret = que_uma_poll_send_jfc(jfc, QUE_DATA_RW_JETTY_POOL_DEPTH, cr, &cnt);
if (ret == DRV_ERROR_NONE) {
for (cr_idx = 0; cr_idx < cnt; cr_idx++) {
ATOMIC_INC((volatile int *)&ctx->cnt[RECV_CQE_TOTAL]);
ret = que_chan_tgt_data_read_and_ack(&cr[cr_idx]);
if (ret != DRV_ERROR_NONE) {
ATOMIC_INC((volatile int *)&ctx->cnt[RECV_CQE_FAIL]);
QUEUE_LOG_ERR("que chan enque fail. (ret=%d; hostpid=%d; cr_idx=%d)\n", ret, ctx->hostpid, cr_idx);
continue;
}
}
ATOMIC_INC((volatile int *)&ctx->cnt[POLL_SEND_JFC_SUCCESS]);
} else {
ATOMIC_INC((volatile int *)&ctx->cnt[POLL_SEND_JFC_FAIL]);
}
idle_jetty_num = que_idle_jetty_find(ctx->devid, d2d_flag);
if ((idle_jetty_num) && (next_rearm_flag == 1)) {
ret = que_uma_rearm_jfc(ctx->pkt_recv_jetty[d2d_flag]->jfc_r);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("que rearm jfc fail. (ret=%d)\n", ret);
}
next_rearm_flag = 0;
}
} else {
cr_num = que_idle_jetty_find(ctx->devid, d2d_flag);
if (cr_num == 0) {
cur_rearm_flag = 0;
next_rearm_flag = 1;
goto out;
}
ret = que_uma_poll_send_jfc(jfc, cr_num, cr, &cnt);
if (ret == DRV_ERROR_NONE) {
for (cr_idx = 0; cr_idx < cnt; cr_idx++) {
ATOMIC_INC((volatile int *)&ctx->cnt[RECV_CQE_TOTAL]);
ret = que_ctx_recv_proc(ctx, &cr[cr_idx], d2d_flag);
if (ret != DRV_ERROR_NONE) {
ATOMIC_INC((volatile int *)&ctx->cnt[RECV_CQE_FAIL]);
QUEUE_LOG_ERR("que chan enque fail. (ret=%d; hostpid=%d; cr_idx=%d)\n", ret, ctx->hostpid, cr_idx);
continue;
}
}
ATOMIC_INC((volatile int *)&ctx->cnt[POLL_SEND_JFC_SUCCESS]);
} else {
ATOMIC_INC((volatile int *)&ctx->cnt[POLL_SEND_JFC_FAIL]);
}
}
out:
que_uma_ack_jfc(jfc, 1);
if (cur_rearm_flag) {
ret = que_uma_rearm_jfc(jfc);
if (ret != DRV_ERROR_NONE) {
QUEUE_LOG_ERR("que rearm jfc fail. (ret=%d)\n", ret);
}
ATOMIC_INC((volatile int *)&ctx->cnt[POLL_SEND_JFC_SUCCESS]);
} else {
ATOMIC_INC((volatile int *)&ctx->cnt[POLL_SEND_JFC_FAIL]);
}
return DRV_ERROR_NONE;
}
static int _que_ctx_poll(struct que_ctx *ctx, unsigned int d2d_flag)
{
int ret;
urma_jfc_t *jfc = NULL;
do {
ret = que_uma_wait_jfc(ctx->pkt_recv_jetty[d2d_flag]->jfce_r, QUE_URMA_WAIT_TIME, &jfc);
if (ret == DRV_ERROR_NONE) {
ATOMIC_INC((volatile int *)&ctx->cnt[WAIT_JFC_SUCCESS]);
ret = que_ctx_tgt_proc(ctx, jfc, d2d_flag);
if (ret != DRV_ERROR_NONE) {
break;
}
} else {
ATOMIC_INC((volatile int *)&ctx->cnt[WAIT_JFC_FAIL]);
break;
}
} while (1);
return (ret == DRV_ERROR_WAIT_TIMEOUT) ? DRV_ERROR_NONE : ret;
}
void que_ctx_poll(unsigned int devid, unsigned int d2d_flag)
{
struct que_ctx *ctx = NULL;
int ret;
ctx = que_ctx_get(devid);
if (que_unlikely(ctx == NULL)) {
QUEUE_LOG_ERR("que ctx get fail. (devid=%u)\n", devid);
return;
}
ret = _que_ctx_poll(ctx, d2d_flag);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que ctx chan poll fail. (ret=%d; devid=%u; hostpid=%d)\n", ret, ctx->devid, ctx->hostpid);
}
que_ctx_put(ctx);
}
static void que_ctx_f2nf_recv(unsigned int devid, unsigned int vqid, struct que_ctx *ctx)
{
int ret;
bool ini_try_flag;
ASYNC_QUE_INI_EVENT event;
unsigned int qid = queue_get_actual_qid(vqid);
ini_try_flag = que_chan_update_ini_status(devid, qid, INI_RECV_F2NF);
if (ini_try_flag) {
ret = que_chan_inter_dev_ini_proc(devid, qid);
event = que_get_enque_event(ret);
(void)que_chan_update_ini_status(devid, qid, event);
}
}
void que_ctx_wait_f2nf(unsigned int devid)
{
struct que_ctx *ctx = NULL;
struct event_res *e_res = NULL;
struct event_info back_event_info = {0};
struct event_proc_result *result = NULL;
int ret;
ctx = que_ctx_get(devid);
if (que_unlikely(ctx == NULL)) {
QUEUE_LOG_ERR("que ctx get fail. (devid=%u)\n", devid);
return;
}
if (ctx->f2nf_res.res_alloc_flag == 0) {
QUEUE_LOG_ERR("que ctx event res not alloc. (devid=%u)\n", devid);
goto out;
}
e_res = &ctx->f2nf_res.f2nf_event_res;
while (1) {
ret = halEschedWaitEvent(devid, e_res->gid, e_res->tid, -1, &back_event_info);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_WARN("halEschedWaitEvent invalid. (ret=%d; event_id=%u; gid=%u; tid=%u)\n", ret,
e_res->event_id, e_res->gid, e_res->tid);
continue;
}
result = (struct event_proc_result *)back_event_info.priv.msg;
if ((back_event_info.priv.msg_len == sizeof(struct event_proc_result)) && (result->ret == QUEUE_IS_CLEAR_MAGIC)) {
goto out;
}
que_ctx_f2nf_recv(devid, back_event_info.comm.subevent_id, ctx);
}
out:
que_ctx_put(ctx);
}
int que_ctx_bulid_f2nf_event(unsigned int devid)
{
struct que_ctx *ctx = NULL;
struct que_f2nf_res *f2nf_res = NULL;
struct event_summary back_event = {0};
struct event_proc_result rsp = {0};
int ret;
unsigned int dst_devid = (devid == halGetHostDevid()) ? devid : SCHED_INVALID_DEVID;
ctx = que_ctx_get(devid);
if (que_unlikely(ctx == NULL)) {
return DRV_ERROR_UNINIT;
}
f2nf_res = &ctx->f2nf_res;
back_event.dst_engine = f2nf_res->dst_engine;
back_event.policy = ONLY;
back_event.pid = f2nf_res->pid;
back_event.grp_id = f2nf_res->f2nf_event_res.gid;
back_event.event_id = f2nf_res->f2nf_event_res.event_id;
back_event.subevent_id = f2nf_res->f2nf_event_res.subevent_id;
back_event.msg_len = (unsigned int)sizeof(struct event_proc_result);
back_event.msg = (char *)&rsp;
back_event.tid = f2nf_res->f2nf_event_res.tid;
rsp.ret = QUEUE_IS_CLEAR_MAGIC;
ret = halEschedSubmitEventEx(devid, dst_devid, &back_event);
que_ctx_put(ctx);
return ret;
}
void que_ctx_cnt_info(unsigned int devid)
{
struct que_ctx *ctx = NULL;
ctx = que_ctx_get(devid);
if (que_unlikely(ctx == NULL)) {
QUEUE_LOG_ERR("que ctx get fail. (devid=%u)\n", devid);
return;
}
QUEUE_RUN_LOG_INFO("que ctx recv_cqe_total_cnt=%u, recv_cqe_fail_cnt=%u, wait_jfc_succ_cnt=%u, wait_jfc_fail_cnt=%u, "
"poll_send_jfc_succ_cnt=%u, poll_send_jfc_fail_cnt=%u. (devid=%u)\n", ctx->cnt[RECV_CQE_TOTAL],
ctx->cnt[RECV_CQE_FAIL], ctx->cnt[WAIT_JFC_SUCCESS], ctx->cnt[WAIT_JFC_FAIL], ctx->cnt[POLL_SEND_JFC_SUCCESS],
ctx->cnt[POLL_SEND_JFC_FAIL], devid);
que_ctx_put(ctx);
}
static int __attribute__((constructor)) que_comm_ctx_init(void)
{
struct que_chan_ctx_agent_list *list = que_get_chan_ctx_agent();
list->que_ctx_cnt_info_print = que_ctx_cnt_info;
return DRV_ERROR_NONE;
}
#else
void que_comm_ctx_emu_test(void)
{
}
#endif