* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "queue_wrapper.h"
#include <thread>
#include "common/inner_error_codes.h"
#include "common/udf_log.h"
namespace FlowFunc {
int32_t QueueWrapper::Enqueue(Mbuf *buf) {
drvError_t drv_ret = halQueueEnQueue(device_id_, queue_id_, buf);
if (drv_ret == DRV_ERROR_NONE) {
return HICAID_SUCCESS;
} else if (drv_ret == DRV_ERROR_QUEUE_FULL) {
HICAID_LOG_INFO("halQueueEnQueue not complete as queue is full, queueId=%u", queue_id_);
return HICAID_ERR_QUEUE_FULL;
} else {
HICAID_LOG_ERROR("halQueueEnQueue failed, drv_ret=%d, queueId=%u", static_cast<int32_t>(drv_ret), queue_id_);
return HICAID_ERR_QUEUE_FAILED;
}
}
int32_t QueueWrapper::Dequeue(Mbuf *&buf) {
HICAID_LOG_DEBUG("ready dequeue for queue[%u], deviceId=%u", queue_id_, device_id_);
void *tmp_buf = nullptr;
drvError_t ret = halQueueDeQueue(device_id_, queue_id_, &tmp_buf);
if (ret == DRV_ERROR_NONE) {
buf = static_cast<Mbuf *>(tmp_buf);
HICAID_LOG_DEBUG("dequeue for queue[%u] success", queue_id_);
return HICAID_SUCCESS;
} else if (ret == DRV_ERROR_QUEUE_EMPTY) {
HICAID_LOG_INFO("halQueueDeQueue get none data as queue is empty, queueId=%u", queue_id_);
return HICAID_ERR_QUEUE_EMPTY;
} else {
HICAID_LOG_ERROR("halQueueDeQueue failed, drv_ret=%d, queueId=%u", static_cast<int32_t>(ret), queue_id_);
return HICAID_ERR_QUEUE_FAILED;
}
}
int32_t QueueWrapper::QueryQueueSize() const {
QueueInfo queue_info{};
drvError_t drv_ret = halQueueQueryInfo(device_id_, queue_id_, &queue_info);
if (drv_ret != DRV_ERROR_NONE) {
HICAID_LOG_ERROR("halQueueQueryInfo failed, drv_ret=%d, deviceId=%u, queueId=%u", static_cast<int32_t>(drv_ret),
device_id_, queue_id_);
return 0;
}
return queue_info.size;
}
int32_t QueueWrapper::QueryQueueDepth() const {
int32_t depth = 0;
drvError_t drv_ret =
halQueueGetStatus(device_id_, queue_id_, QUERY_QUEUE_DEPTH, sizeof(int32_t), static_cast<void *>(&depth));
if (drv_ret != DRV_ERROR_NONE) {
HICAID_LOG_ERROR("halQueueGetStatus failed, drv_ret=%d, deviceId=%u, queueId=%u", static_cast<int32_t>(drv_ret),
device_id_, queue_id_);
return 0;
}
return depth;
}
int32_t QueueWrapper::DequeueWithTimeout(Mbuf *&buf, int32_t timeout) {
constexpr uint32_t max_wait_time = 3600 * 1000;
uint32_t wait_timeout = ((timeout < 0) || (static_cast<uint32_t>(timeout) > max_wait_time)) ?
max_wait_time : static_cast<uint32_t>(timeout);
uint32_t retry_interval = 10;
uint32_t retry_time = 0;
QUEUE_QUERY_ITEM query_item = QUERY_QUEUE_STATUS;
int32_t queue_status = static_cast<int32_t>(QUEUE_EMPTY);
do {
drvError_t drv_ret = halQueueGetStatus(device_id_, queue_id_, query_item, sizeof(queue_status), &queue_status);
if (drv_ret != DRV_ERROR_NONE) {
HICAID_LOG_ERROR("query queue status failed, drv_ret=%d, queue_id_=%u, query_item=%d, len=%zu",
static_cast<int32_t>(drv_ret), queue_id_, static_cast<int32_t>(query_item), sizeof(queue_status));
return HICAID_ERR_QUEUE_FAILED;
}
if (queue_status != static_cast<int32_t>(QUEUE_EMPTY)) {
HICAID_LOG_INFO("wait queue not empty finish, queue_id_=%u, queue_status=%d, retry_time=%u ms.", queue_id_,
queue_status, retry_time);
break;
}
if (retry_time >= wait_timeout) {
HICAID_LOG_ERROR("wait queue not empty timeout, queue_id_=%u, retry_time=%u ms", queue_id_, retry_time);
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(retry_interval));
retry_time += retry_interval;
} while (true);
return Dequeue(buf);
}
int32_t QueueWrapper::DiscardMbuffOnce() {
Mbuf *tmp_buf = nullptr;
const int32_t ret = Dequeue(tmp_buf);
if (ret == HICAID_SUCCESS) {
(void)halMbufFree(tmp_buf);
}
return ret;
}
int32_t QueueWrapper::DiscardMbuf() {
int32_t ret = HICAID_SUCCESS;
uint32_t count = 0U;
while (ret == HICAID_SUCCESS) {
ret = DiscardMbuffOnce();
if (ret == HICAID_ERR_QUEUE_EMPTY) {
break;
} else if (ret == HICAID_SUCCESS) {
++count;
continue;
} else {
HICAID_LOG_ERROR("Discard mbuff failed result of dequeue error.");
return HICAID_ERR_QUEUE_FAILED;
}
}
HICAID_LOG_INFO("Discard [%u] times data for queueId=%u.", count, queue_id_);
return HICAID_SUCCESS;
}
}