* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include <stdlib.h>
#ifndef EMU_ST
#include "svm_user_interface.h"
#endif
#include "queue.h"
#include "que_ub_msg.h"
#include "que_uma.h"
#include "que_compiler.h"
#include "queue_interface.h"
#include "que_comm_agent.h"
#include "queue_agent.h"
#include "que_jetty.h"
#include "que_mem_merge.h"
#include "que_ini_proc.h"
#define US_PER_SECOND 1000000
#ifndef EMU_ST
static int g_ini_level = 0;
static uint64_t ini_base_time[MAX_DEVICE] = {0};
int que_get_ini_log_level(void)
{
return g_ini_level;
}
uint64_t que_get_ini_basetime(unsigned int devid)
{
return ini_base_time[devid];
}
void que_update_ini_basetime(unsigned int devid)
{
uint64_t cur_time = que_get_cur_time_ns();
ini_base_time[devid] = cur_time;
}
void que_ini_time_stamp(struct que_ini_proc *ini_proc, QUE_TRACE_INI_TIMESTAMP type)
{
int ini_log_level = que_get_ini_log_level();
unsigned int num = TRACE_INI_LEVLE0_BUTT + ini_proc->total_iovec_num * (TRACE_INI_LEVLE1_BUTT - TRACE_INI_LEVLE1_START) * (unsigned int)ini_log_level;
if ((unsigned int)type >= num) {
return;
}
if (ini_proc->timestamp == NULL) {
return;
}
ini_proc->timestamp[type] = que_get_cur_time_ns();
}
struct que_ini_proc *que_ini_proc_create()
{
struct que_ini_proc *ini_proc = NULL;
ini_proc = (struct que_ini_proc *)calloc(QUEUE_ENQUE_BUTT, sizeof(struct que_ini_proc));
if (que_unlikely(ini_proc == NULL)) {
return NULL;
}
return ini_proc;
}
void que_ini_proc_destroy(struct que_ini_proc *ini_proc)
{
free(ini_proc);
}
struct que_pkt *que_tx_get_first_pkt(struct que_tx *tx)
{
unsigned long pkt_base = (unsigned long)(uintptr_t)tx->pkt;
return (struct que_pkt *)(pkt_base);
}
struct que_pkt *que_tx_get_other_pkts(struct que_tx *tx)
{
unsigned long pkt_base = (unsigned long)(uintptr_t)tx->pkt;
return (struct que_pkt *)(pkt_base + QUE_UMA_MAX_SEND_SIZE);
}
static inline QUE_MEM_TYPE que_get_buff_memtype(struct buff_iovec *vector)
{
#ifdef DRV_HOST
struct DVattribute attr;
drvError_t ret;
DVdeviceptr vptr = (DVdeviceptr)(uintptr_t)vector->ptr[0].iovec_base;
ret = drvMemGetAttribute(vptr, &attr);
if (ret != DRV_ERROR_NONE) {
return MEM_NOT_SVM;
} else {
if (attr.memType == DV_MEM_LOCK_DEV) {
return MEM_DEVICE_SVM;
} else if (attr.memType == DV_MEM_USER_MALLOC) {
return MEM_NOT_SVM;
} else {
return MEM_OTHERS_SVM;
}
}
#else
return MEM_NOT_SVM;
#endif
}
static int que_tx_first_iovec_num_get(QUE_MEM_TYPE mem_type, struct buff_iovec *vector, unsigned int *iovec_num,
bool *default_wr_flag)
{
unsigned int i;
unsigned int max_node_num_in_pkt = _que_get_node_num();
unsigned long long pkt_read_wr_num, wr_num_tmp, total_rw_wr_num = 0;
unsigned long long max_pkt_size = _que_get_pkt_size(vector->count);
*iovec_num = que_min(max_node_num_in_pkt, vector->count);
if (mem_type == MEM_DEVICE_SVM) {
return DRV_ERROR_NONE;
}
pkt_read_wr_num = que_align_up(max_pkt_size, QUE_URMA_MAX_SIZE) / QUE_URMA_MAX_SIZE;
if ((pkt_read_wr_num + 1) > QUE_MAX_RW_WR_NUM) {
QUEUE_LOG_ERR("que tx fill iovec size over limit. (max_pkt_size=%llu; pkt_read_wr_num=%llu)\n",
max_pkt_size, pkt_read_wr_num);
return DRV_ERROR_PARA_ERROR;
}
total_rw_wr_num = pkt_read_wr_num + (vector->context_len != 0);
for (i = 0; i < *iovec_num; i++) {
wr_num_tmp = que_align_up(vector->ptr[i].len, QUE_URMA_MAX_SIZE) / QUE_URMA_MAX_SIZE;
if (((total_rw_wr_num + wr_num_tmp) > QUE_MAX_RW_WR_NUM) ||
((total_rw_wr_num + wr_num_tmp) <= total_rw_wr_num)) {
break;
}
total_rw_wr_num += wr_num_tmp;
}
*default_wr_flag = (total_rw_wr_num <= QUE_DEFAULT_RW_WR_NUM) ? true : false;
*iovec_num = i;
return DRV_ERROR_NONE;
}
static struct que_tx *_que_tx_create(struct que_ini_proc *ini_proc, struct buff_iovec *vector)
{
unsigned int first_iovec_num;
unsigned int iovec_num = vector->count;
QUE_MEM_TYPE mem_type = (ini_proc->que_type == ASYNC_ENQUE) ? MEM_NOT_SVM : que_get_buff_memtype(vector);
unsigned long long iovec_size_temp, total_iovec_size = 0;
bool default_wr_flag = true;
struct que_tx *tx = NULL;
unsigned int i;
tx = (struct que_tx *)calloc(1, sizeof(struct que_tx));
if (que_unlikely(tx == NULL)) {
QUEUE_LOG_ERR("que tx alloc fail. (devid=%u; qid=%u; size=%ld)\n",
ini_proc->devid, ini_proc->qid, sizeof(struct que_tx));
return NULL;
}
for (i = 0; i < iovec_num; i++) {
iovec_size_temp = total_iovec_size + vector->ptr[i].len;
if (total_iovec_size < iovec_size_temp) {
total_iovec_size = iovec_size_temp;
} else {
QUEUE_LOG_ERR("que tx fill iovec size over limit. (devid=%u; qid=%u; iovec_size_temp=%llu; total_iovec_size=%llu)\n",
ini_proc->devid, ini_proc->qid, iovec_size_temp, total_iovec_size);
free(tx);
return NULL;
}
}
int ret = que_tx_first_iovec_num_get(mem_type, vector, &first_iovec_num, &default_wr_flag);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
free(tx);
return NULL;
}
tx->mem_type = mem_type;
tx->que_type = ini_proc->que_type;
tx->total_iovec_num = iovec_num;
tx->pkt_size = _que_get_pkt_size(iovec_num) + QUE_UMA_MAX_SEND_SIZE;
tx->total_iovec_size = total_iovec_size;
tx->first_iovec_num = first_iovec_num;
tx->remain_iovec_num = iovec_num - first_iovec_num;
tx->default_wr_flag = default_wr_flag;
tx->pkt_timestamp = que_get_cur_time_ns();
ini_proc->sn = ini_proc->sn + 1;
tx->sn = ini_proc->sn;
return tx;
}
static void _que_tx_destroy(struct que_tx *tx)
{
free(tx);
}
static int _que_tx_pkt_init(struct que_ini_proc *ini_proc, struct que_tx *tx)
{
struct que_pkt *pkt = NULL;
int ret;
urma_target_seg_t *tseg = NULL;
unsigned int urma_devid = que_get_urma_devid(ini_proc->devid, ini_proc->peer_devid);
unsigned int access = URMA_ACCESS_READ;
que_ini_time_stamp(ini_proc, TRACE_PKT_SEG_CREATE_START);
ret = que_mem_alloc((void **)(&pkt), tx->pkt_size);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que pkt mem alloc fail. (devid=%u; qid=%u; size=%llu)\n", ini_proc->devid, ini_proc->qid,
tx->pkt_size);
return DRV_ERROR_OUT_OF_MEMORY;
}
if (que_is_share_mem((unsigned long long)(uintptr_t)pkt) == false) {
tseg = que_pin_seg_create(urma_devid, (unsigned long long)(uintptr_t)pkt, tx->pkt_size, access, &ini_proc->token_info, ini_proc->d2d_flag);
} else {
tseg = que_get_urma_ctx_tseg(urma_devid, ini_proc->d2d_flag);
}
if (que_unlikely(tseg == NULL)) {
que_mem_free(pkt);
QUEUE_LOG_ERR("que seg create fail. (devid=%u; urma_devid=%u; qid=%u; size=%llu)\n", ini_proc->devid, urma_devid,
ini_proc->qid, tx->pkt_size);
return DRV_ERROR_QUEUE_INNER_ERROR;
}
que_ini_time_stamp(ini_proc, TRACE_PKT_SEG_CREATE_END);
tx->pkt = pkt;
tx->pkt_tseg = tseg;
return DRV_ERROR_NONE;
}
static void _que_tx_pkt_uninit(struct que_tx *tx)
{
unsigned long long addr = (unsigned long long)(uintptr_t)tx->pkt;
if (que_likely(tx->pkt_tseg != NULL)) {
if (que_is_share_mem(addr) == false) {
que_seg_destroy(tx->pkt_tseg);
tx->pkt_tseg = NULL;
}
}
if (que_likely(tx->pkt != NULL)) {
que_mem_free(tx->pkt);
tx->pkt = NULL;
}
}
static int _que_tx_vector_merge(unsigned int urma_devid, struct que_ini_proc *ini_proc, struct que_tx *tx, struct buff_iovec *vector)
{
int ret;
unsigned int i, j;
unsigned long long va, size;
if (vector->context_len != 0) {
ret = que_mem_merge(&ini_proc->mem_ctx, urma_devid, (unsigned long long)(uintptr_t)vector->context_base, vector->context_len);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que ctx merge fail. (devid=%u; urma_devid=%u; qid=%u; va=0x%llx; len=%llu)\n",
ini_proc->devid, urma_devid, ini_proc->qid, (unsigned long long)(uintptr_t)vector->context_base,
vector->context_len);
return ret;
}
}
if (tx->mem_type == MEM_DEVICE_SVM) {
return DRV_ERROR_NONE;
}
for (i = 0; i < tx->total_iovec_num; i++) {
va = (unsigned long long)(uintptr_t)vector->ptr[i].iovec_base;
size = vector->ptr[i].len;
ret = que_mem_merge(&ini_proc->mem_ctx, urma_devid, va, size);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que ctx seg init fail. (ret=%d; devid=%u; qid=%u)\n", ret, ini_proc->devid, ini_proc->qid);
goto err_out;
}
}
return DRV_ERROR_NONE;
err_out:
for (j = 0; j < i; j++) {
va = (unsigned long long)(uintptr_t)vector->ptr[j].iovec_base;
que_mem_seg_unregister(&ini_proc->mem_ctx, urma_devid, va);
}
que_mem_seg_unregister(&ini_proc->mem_ctx, urma_devid, (unsigned long long)(uintptr_t)vector->context_base);
return ret;
}
static int _que_tx_ctx_seg_init(unsigned int urma_devid, struct que_ini_proc *ini_proc, struct que_tx *tx, struct buff_iovec *vector)
{
urma_target_seg_t *tseg = NULL;
unsigned int access;
if (vector->context_len != 0) {
access = (ini_proc->que_type == H2D_SYNC_DEQUE) ? DEQUE_INI_ACCESS : ENQUE_INI_ACCESS;
que_ini_time_stamp(ini_proc, TRACE_CTX_SEG_CREATE_START);
tseg = que_mem_seg_register(&ini_proc->mem_ctx, ini_proc->d2d_flag, &ini_proc->token_info, QUE_PIN, urma_devid,
(unsigned long long)(uintptr_t)vector->context_base, access);
if (que_unlikely(tseg == NULL)) {
QUEUE_LOG_ERR("que ctx seg create fail. (devid=%u; urma_devid=%u; qid=%u; va=0x%llx; len=%llu)\n",
ini_proc->devid, urma_devid, ini_proc->qid, (unsigned long long)(uintptr_t)vector->context_base,
vector->context_len);
return DRV_ERROR_QUEUE_INNER_ERROR;
}
que_ini_time_stamp(ini_proc, TRACE_CTX_SEG_CREATE_END);
}
tx->ctx_tseg = tseg;
return DRV_ERROR_NONE;
}
static void _que_tx_ctx_seg_uninit(unsigned int urma_devid, struct que_tx *tx)
{
if (que_likely(tx->ctx_tseg != NULL)) {
tx->ctx_tseg = NULL;
}
}
static int _que_tx_iovec_seg_init(unsigned int urma_devid, struct que_ini_proc *ini_proc, struct que_tx *tx, struct buff_iovec *vector)
{
urma_target_seg_t **tseg = NULL;
unsigned int i, j, access, pin_flg;
unsigned long long va, size;
if (tx->mem_type == MEM_DEVICE_SVM) {
tx->iovec_tseg = NULL;
return DRV_ERROR_NONE;
}
tseg = (urma_target_seg_t **)malloc(sizeof(urma_target_seg_t *) * tx->total_iovec_num);
if (que_unlikely(tseg == NULL)) {
QUEUE_LOG_ERR("que tx iovec tseg alloc fail. (devid=%u; qid=%u; total_iovec_num=%u)\n",
ini_proc->devid, ini_proc->qid, tx->total_iovec_num);
return DRV_ERROR_OUT_OF_MEMORY;
}
pin_flg = (tx->mem_type == MEM_OTHERS_SVM) ? QUE_NON_PIN : QUE_PIN;
access = (ini_proc->que_type == H2D_SYNC_DEQUE) ? DEQUE_INI_ACCESS : ENQUE_INI_ACCESS;
for (i = 0; i < tx->total_iovec_num; i++) {
va = (unsigned long long)(uintptr_t)vector->ptr[i].iovec_base;
size = vector->ptr[i].len;
que_ini_time_stamp(ini_proc, TRACE_INI_IOVEC_SEG_CREATE_START + i * (TRACE_INI_LEVLE1_BUTT - TRACE_INI_LEVLE1_START));
tseg[i] = que_mem_seg_register(&ini_proc->mem_ctx, ini_proc->d2d_flag, &ini_proc->token_info, pin_flg, urma_devid, va, access);
if (que_unlikely(tseg[i] == NULL)) {
QUEUE_LOG_ERR("que seg create fail. (devid=%u; urma_devid=%u; qid=%u; mem_type=%d; va=0x%llx; size=%llu; i=%u; iovec_num=%u)\n",
ini_proc->devid, urma_devid, ini_proc->qid, tx->mem_type, va, size, i, tx->total_iovec_num);
goto err_out;
}
que_ini_time_stamp(ini_proc, TRACE_INI_IOVEC_SEG_CREATE_END + i * (TRACE_INI_LEVLE1_BUTT - TRACE_INI_LEVLE1_START));
}
tx->iovec_tseg = tseg;
return DRV_ERROR_NONE;
err_out:
for (j = 0; j < i; j++) {
va = (unsigned long long)(uintptr_t)vector->ptr[j].iovec_base;
que_mem_seg_unregister(&ini_proc->mem_ctx, urma_devid, va);
}
free(tseg);
return DRV_ERROR_QUEUE_INNER_ERROR;
}
static void _que_tx_iovec_seg_uninit(unsigned int urma_devid, struct que_tx *tx)
{
unsigned int i;
if (tx->iovec_tseg == NULL) {
return;
}
for (i = 0; i < tx->total_iovec_num; i++) {
tx->iovec_tseg[i] = NULL;
}
free(tx->iovec_tseg);
tx->iovec_tseg = NULL;
}
static int _que_tx_seg_init(struct que_ini_proc *ini_proc, struct que_tx *tx, struct buff_iovec *vector)
{
int ret;
unsigned int urma_devid = que_get_urma_devid(ini_proc->devid, ini_proc->peer_devid);
ret = _que_tx_vector_merge(urma_devid, ini_proc, tx, vector);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que iovec merge fail. (ret=%d; devid=%u; qid=%u)\n", ret, ini_proc->devid, ini_proc->qid);
return ret;
}
ret = _que_tx_ctx_seg_init(urma_devid, ini_proc, tx, vector);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que ctx seg init fail. (ret=%d; devid=%u; qid=%u)\n", ret, ini_proc->devid, ini_proc->qid);
goto merge_err;
}
ret = _que_tx_iovec_seg_init(urma_devid, ini_proc, tx, vector);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
_que_tx_ctx_seg_uninit(urma_devid, tx);
QUEUE_LOG_ERR("que iovec seg init fail. (ret=%d; devid=%u; qid=%u)\n", ret, ini_proc->devid, ini_proc->qid);
goto merge_err;
}
return DRV_ERROR_NONE;
merge_err:
que_mem_erase_all_node(&ini_proc->mem_ctx);
return ret;
}
static void _que_tx_seg_uninit(unsigned int urma_devid, struct que_tx *tx, struct que_mem_merge_ctx *mem_ctx)
{
_que_tx_iovec_seg_uninit(urma_devid, tx);
_que_tx_ctx_seg_uninit(urma_devid, tx);
que_mem_erase_all_node(mem_ctx);
}
static struct que_tx *que_tx_create(struct que_ini_proc *ini_proc, struct buff_iovec *vector)
{
struct que_tx *tx = NULL;
int ret;
tx = _que_tx_create(ini_proc, vector);
if (que_unlikely(tx == NULL)) {
QUEUE_LOG_ERR("que tx create fail. (devid=%u; qid=%u)\n", ini_proc->devid, ini_proc->qid);
return NULL;
}
ret = _que_tx_pkt_init(ini_proc, tx);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que tx pkt init fail. (ret=%d; devid=%u; qid=%u)\n", ret, ini_proc->devid, ini_proc->qid);
goto err_que_tx_pkt_init;
}
ret = _que_tx_seg_init(ini_proc, tx, vector);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que tx iovec init fail. (ret=%d; devid=%u; qid=%u)\n", ret, ini_proc->devid, ini_proc->qid);
goto err_que_tx_iovec_init;
}
return tx;
err_que_tx_iovec_init:
_que_tx_pkt_uninit(tx);
err_que_tx_pkt_init:
_que_tx_destroy(tx);
return NULL;
}
static void que_tx_destroy(struct que_tx *tx, struct que_mem_merge_ctx *mem_ctx, unsigned int urma_devid)
{
_que_tx_seg_uninit(urma_devid, tx, mem_ctx);
_que_tx_pkt_uninit(tx);
_que_tx_destroy(tx);
}
static void que_tx_pkt_node_fill(struct que_tx *tx, struct buff_iovec *vector,
unsigned int copied_vec_num, struct que_pkt *pkt)
{
unsigned int node_id, iovec_id;
for (node_id = 0; node_id < pkt->iovec_node_num; node_id++) {
iovec_id = copied_vec_num + node_id;
pkt->iovec_node[node_id].va = (unsigned long long)vector->ptr[iovec_id].iovec_base;
pkt->iovec_node[node_id].size = vector->ptr[iovec_id].len;
if (tx->iovec_tseg != NULL) {
pkt->iovec_node[node_id].seg = tx->iovec_tseg[iovec_id]->seg;
}
}
}
static void que_tx_pkt_head_fill(unsigned int qid, unsigned int local_qid, unsigned int devid, struct que_tx *tx,
urma_token_t token, struct buff_iovec *vector, urma_jfr_id_t jfr_id, struct que_pkt *pkt)
{
uint64_t ini_basetime = que_get_ini_basetime(devid);
pkt->head.qid = qid;
pkt->head.peer_qid = local_qid;
pkt->head.total_iovec_num = tx->total_iovec_num;
pkt->head.total_iovec_size = tx->total_iovec_size;
pkt->head.mem_type = tx->mem_type;
pkt->head.que_type = tx->que_type;
pkt->head.ctx_node.va = (unsigned long long)(uintptr_t)vector->context_base;
pkt->head.ctx_node.size = vector->context_len;
pkt->head.jfr_id = jfr_id;
pkt->head.token = token;
if (tx->ctx_tseg != NULL) {
pkt->head.ctx_node.seg = tx->ctx_tseg->seg;
}
pkt->head.pkt_node.seg = tx->pkt_tseg->seg;
pkt->head.pkt_node.va = (unsigned long long)(uintptr_t)tx->pkt;
pkt->head.pkt_node.size = tx->pkt_size;
pkt->head.default_wr_flag = tx->default_wr_flag;
pkt->head.first_iovec_num = tx->first_iovec_num;
pkt->head.remain_iovec_num = tx->remain_iovec_num;
pkt->head.pkt_timestamp = tx->pkt_timestamp;
pkt->head.ini_base_timestamp = ini_basetime;
pkt->head.sn = tx->sn;
}
static void que_tx_pkt_fill(unsigned int peer_qid, unsigned int local_qid, unsigned int devid, struct que_tx *tx,
urma_token_t token, struct buff_iovec *vector, urma_jfr_id_t jfr_id)
{
unsigned int copied_vec_num = 0;
struct que_pkt *pkt = que_tx_get_first_pkt(tx);
que_tx_pkt_head_fill(peer_qid, local_qid, devid, tx, token, vector, jfr_id, pkt);
pkt->iovec_node_num = pkt->head.first_iovec_num;
que_tx_pkt_node_fill(tx, vector, copied_vec_num, pkt);
copied_vec_num += pkt->iovec_node_num;
if (copied_vec_num == vector->count) {
return;
}
pkt = que_tx_get_other_pkts(tx);
que_tx_pkt_head_fill(peer_qid, local_qid, devid, tx, token, vector, jfr_id, pkt);
pkt->iovec_node_num = pkt->head.remain_iovec_num;
que_tx_pkt_node_fill(tx, vector, copied_vec_num, pkt);
copied_vec_num += pkt->iovec_node_num;
}
static int _que_tx_pkt_post(struct que_ini_proc *ini_proc, struct que_tx *tx)
{
struct que_pkt *pkt = NULL;
urma_sge_t sge;
pkt = que_tx_get_first_pkt(tx);
sge.addr = (uint64_t)(uintptr_t)pkt;
sge.len = _que_get_pkt_size(pkt->iovec_node_num);
sge.tseg = tx->pkt_tseg;
return que_uma_send_post_and_wait(ini_proc->pkt_send_jetty, &sge, ini_proc->d2d_flag);
}
static int _que_tx_send(struct que_ini_proc *ini_proc, struct que_tx *tx)
{
int ret = _que_tx_pkt_post(ini_proc, tx);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que tx pkt post fail. (ret=%d; devid=%u; qid=%u)\n", ret, ini_proc->devid, ini_proc->qid);
return ret;
}
return DRV_ERROR_NONE;
}
static int que_tx_send(struct que_ini_proc *ini_proc, struct que_tx *tx, struct buff_iovec *vector)
{
int ret;
int virqid = queue_get_virtual_qid(ini_proc->qid, LOCAL_QUEUE);
if (ini_proc->que_type != ASYNC_ENQUE) {
que_tx_pkt_fill(ini_proc->qid, ini_proc->qid, ini_proc->devid, tx, ini_proc->token_info.token,
vector, ini_proc->imm_recv_jetty->jfr->jfr_id);
} else {
que_tx_pkt_fill(ini_proc->peer_qid, virqid, ini_proc->devid, tx, ini_proc->token_info.token,
vector, ini_proc->qjfr->jfr->jfr_id);
}
que_ini_time_stamp(ini_proc, TRACE_PKT_FILL);
ret = _que_tx_send(ini_proc, tx);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que tx send fail. (ret=%d; devid=%u; qid=%u)\n", ret, ini_proc->devid, ini_proc->qid);
}
que_ini_time_stamp(ini_proc, TRACE_TX_SEND);
return ret;
}
int que_ini_pkt_send(struct que_ini_proc *ini_proc, struct buff_iovec *vector)
{
struct que_tx *tx = NULL;
int ret;
tx = que_tx_create(ini_proc, vector);
if (que_unlikely(tx == NULL)) {
QUEUE_LOG_ERR("que tx create fail. (devid=%u; qid=%u)\n", ini_proc->devid, ini_proc->qid);
return DRV_ERROR_QUEUE_INNER_ERROR;
}
que_ini_time_stamp(ini_proc, TRACE_TX_CREATE);
ini_proc->tx = tx;
ini_proc->vector = vector;
ret = que_tx_send(ini_proc, tx, vector);
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que tx send fail. (ret=%d; devid=%u; qid=%u)\n", ret, ini_proc->devid, ini_proc->qid);
return DRV_ERROR_QUEUE_INNER_ERROR;
}
return DRV_ERROR_NONE;
}
int que_ini_ack_wait(struct que_ini_proc *ini_proc, int timeout)
{
int ret;
int timeout_ms_ = timeout;
struct timeval start, end;
que_ack_data ack_data = {0};
que_ini_time_stamp(ini_proc, TRACE_ACK_WAIT_START);
wait:
que_get_time(&start);
ret = que_uma_imm_wait(ini_proc->imm_recv_jetty, ini_proc->recv_para, &ack_data.imm_data, timeout_ms_);
if (que_likely(ret == DRV_ERROR_NONE)) {
if ((ack_data.ack_msg.sn != ini_proc->tx->sn) ||
(ini_proc->qid != (unsigned int)ack_data.ack_msg.qid)) {
QUEUE_LOG_ERR("que ack error. (ret=%d; devid=%u; memtype=%d; exp_qid=%u; act_qid=%u,"
"exp_sn=%d, act_sn=%u)\n",
ret, ini_proc->devid, ini_proc->que_type, ini_proc->qid, ack_data.ack_msg.qid,
ini_proc->tx->sn, ack_data.ack_msg.sn);
que_get_time(&end);
queue_agent_update_time(start, end, &timeout_ms_);
goto wait;
}
}
if (que_unlikely(ret != DRV_ERROR_NONE)) {
QUEUE_LOG_ERR("que wait fail. (ret=%d; devid=%u; qid=%u; que_type=%d, timeout=%d)\n",
ret, ini_proc->devid, ini_proc->qid, (int)ini_proc->que_type, timeout);
return ret;
}
if (que_unlikely(ack_data.ack_msg.result == DRV_ERROR_TRANS_LINK_ACK_TIMEOUT_ERR)) {
QUEUE_LOG_WARN("The packet has already been received in tgt, continue wait ack.\n");
goto wait;
}
que_ini_time_stamp(ini_proc, TRACE_ACK_WAIT_END);
ini_proc->tgt_time = ack_data.ack_msg.tgt_time;
return ack_data.ack_msg.result;
}
void que_ini_proc_done(struct que_ini_proc *ini_proc)
{
unsigned int urma_devid = que_get_urma_devid(ini_proc->devid, ini_proc->peer_devid);
if (que_likely(ini_proc->tx != NULL)) {
que_tx_destroy(ini_proc->tx, &ini_proc->mem_ctx, urma_devid);
ini_proc->tx = NULL;
}
}
#else
void que_ini_proc_emu_test(void)
{
}
#endif