* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "flow_model_impl.h"
#include <functional>
#include "common/udf_log.h"
#include "common/scope_guard.h"
#include "common/util.h"
#include "flow_func/mbuf_flow_msg.h"
#include "config/global_config.h"
#include "reader_writer/queue_wrapper.h"
#include "reader_writer/proxy_queue_wrapper.h"
#include "common/inner_error_codes.h"
namespace FlowFunc {
namespace {
constexpr int32_t kDequeWaitPerLoop = 1000;
constexpr int32_t kQueueAttachTimeout = 3 * 1000;
std::vector<int32_t> GetQueueSize(const std::vector<std::unique_ptr<QueueWrapper>> &queue_wrappers) {
std::vector<int32_t> queue_size_list(queue_wrappers.size());
for (size_t idx = 0; idx < queue_wrappers.size(); ++idx) {
queue_size_list[idx] = queue_wrappers[idx]->QueryQueueSize();
}
return queue_size_list;
}
}
thread_local bool FlowModelImpl::handle_event_ = false;
FlowModelImpl::FlowModelImpl(std::vector<QueueDevInfo> input_queue_infos, std::vector<QueueDevInfo> output_queue_infos,
std::unique_ptr<DataAligner> data_aligner)
: input_queue_infos_(std::move(input_queue_infos)), output_queue_infos_(std::move(output_queue_infos)),
data_aligner_(std::move(data_aligner)) {
}
int32_t FlowModelImpl::Init() {
invoke_model_sched_group_id_ = GlobalConfig::Instance().GetInvokeModelSchedGroupId();
input_queue_wrappers_.resize(input_queue_infos_.size());
output_queue_wrappers_.resize(output_queue_infos_.size());
for (size_t input_idx = 0; input_idx < input_queue_infos_.size(); ++input_idx) {
const auto &input_queue_info = input_queue_infos_[input_idx];
auto dev_ret = halQueueAttach(input_queue_info.device_id, input_queue_info.queue_id, kQueueAttachTimeout);
if (dev_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("attached feed input queue[%u] failed, ret[%d], queueDeviceId[%u]", input_queue_info.queue_id,
static_cast<int32_t>(dev_ret), input_queue_info.device_id);
return FLOW_FUNC_ERR_QUEUE_ERROR;
}
if (input_queue_info.is_proxy_queue) {
input_queue_wrappers_[input_idx].reset(
new(std::nothrow) ProxyQueueWrapper(input_queue_info.device_id, input_queue_info.queue_id));
} else {
input_queue_wrappers_[input_idx].reset(
new(std::nothrow) QueueWrapper(input_queue_info.device_id, input_queue_info.queue_id));
}
if (input_queue_wrappers_[input_idx] == nullptr) {
UDF_LOG_ERROR("alloc queueWrapper failed, is_proxy_queue=%d, queue_id=%u.",
static_cast<int32_t>(input_queue_info.is_proxy_queue), input_queue_info.queue_id);
return FLOW_FUNC_FAILED;
}
}
QueueSetInput input;
QueueSetInputPara input_param;
for (size_t output_idx = 0; output_idx < output_queue_infos_.size(); ++output_idx) {
const auto &output_queue_info = output_queue_infos_[output_idx];
auto output_queue_id = output_queue_info.queue_id;
auto dev_ret = halQueueAttach(output_queue_info.device_id, output_queue_info.queue_id, kQueueAttachTimeout);
if (dev_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("attached fetch output queue[%u] failed, ret[%d], queueDeviceId[%u]", output_queue_id,
static_cast<int32_t>(dev_ret), output_queue_info.queue_id);
return FLOW_FUNC_ERR_QUEUE_ERROR;
}
if (output_queue_info.is_proxy_queue) {
output_queue_wrappers_[output_idx].reset(
new(std::nothrow) ProxyQueueWrapper(output_queue_info.device_id, output_queue_id));
} else {
input.queSetWorkMode.qid = output_queue_info.queue_id;
input.queSetWorkMode.workMode = static_cast<uint32_t>(QUEUE_MODE_PULL);
input_param.inBuff = static_cast<void *>(&input);
input_param.inLen = static_cast<uint32_t>(sizeof(QueueSetInput));
(void)halQueueSet(output_queue_info.device_id, QUEUE_SET_WORK_MODE, &input_param);
output_queue_wrappers_[output_idx].reset(
new(std::nothrow) QueueWrapper(output_queue_info.device_id, output_queue_id));
}
if (output_queue_wrappers_[output_idx] == nullptr) {
UDF_LOG_ERROR("alloc output queueWrapper failed, is_proxy_queue=%d, queue_id=%u.",
static_cast<int32_t>(output_queue_info.is_proxy_queue), output_queue_id);
return FLOW_FUNC_FAILED;
}
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowModelImpl::Run(const std::vector<std::shared_ptr<FlowMsg>> &input_msgs,
std::vector<std::shared_ptr<FlowMsg>> &output_msgs, int32_t timeout) {
UDF_LOG_DEBUG("run flow model, input_msgs size=%zu, timeout=%d(ms)", input_msgs.size(), timeout);
if (input_msgs.size() != input_queue_infos_.size()) {
UDF_LOG_ERROR("input msgs num=%zu is not same as input queue num=%zu", input_msgs.size(),
input_queue_infos_.size());
return FLOW_FUNC_ERR_PARAM_INVALID;
}
if (data_aligner_ != nullptr) {
data_aligner_->Reset();
}
std::unique_lock<std::mutex> guard(model_mutex_);
for (size_t input_idx = 0UL; input_idx < input_queue_infos_.size(); ++input_idx) {
int32_t feed_ret = Feed(input_idx, input_msgs[input_idx], timeout);
if (feed_ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("feed flow msg failed, queue id=%u, input_idx=%zu", input_queue_infos_[input_idx].queue_id,
input_idx);
return feed_ret;
}
UDF_LOG_DEBUG("feed flow msg success, queue id=%u, input_idx=%zu", input_queue_infos_[input_idx].queue_id,
input_idx);
}
if (data_aligner_ != nullptr) {
return AlignFetch(output_msgs, timeout);
}
for (size_t output_idx = 0UL; output_idx < output_queue_infos_.size(); ++output_idx) {
std::shared_ptr<FlowMsg> flow_msg;
int32_t fetch_ret = Fetch(output_idx, flow_msg, timeout);
if (fetch_ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("fetch flow msg failed, queue id=%u, output_idx=%zu", output_queue_infos_[output_idx].queue_id,
output_idx);
return fetch_ret;
}
UDF_LOG_DEBUG("fetch flow msg success, queue id=%u, output_idx=%zu", output_queue_infos_[output_idx].queue_id,
output_idx);
output_msgs.emplace_back(flow_msg);
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowModelImpl::Feed(size_t input_idx, const std::shared_ptr<FlowMsg> &flow_msg, int32_t timeout) {
const auto &input_queue_info = input_queue_infos_[input_idx];
UDF_LOG_DEBUG("feed flow msg, input_idx=%zu, qid=%u, timeout=%d(ms).", input_idx, input_queue_info.queue_id, timeout);
auto mbuf_flow_msg = std::dynamic_pointer_cast<MbufFlowMsg>(flow_msg);
if (mbuf_flow_msg == nullptr) {
UDF_LOG_ERROR("not support custom define flow msg now, input_idx=%zu, qid=%u.", input_idx,
input_queue_info.queue_id);
return FLOW_FUNC_ERR_PARAM_INVALID;
}
current_trans_id_ = mbuf_flow_msg->GetTransactionId();
UDF_LOG_DEBUG("input_idx=%zu, qid=%u, feed msg=%s", input_idx, input_queue_info.queue_id,
mbuf_flow_msg->DebugString().c_str());
Mbuf *feed_mbuf = nullptr;
auto drv_int_ret = halMbufCopyRef(mbuf_flow_msg->GetMbuf(), &feed_mbuf);
if ((drv_int_ret != static_cast<int32_t>(DRV_ERROR_NONE)) || (feed_mbuf == nullptr)) {
UDF_LOG_ERROR("copy ref mbuf failed, input_idx=%zu, qid=%u.", input_idx, input_queue_info.queue_id);
return FLOW_FUNC_ERR_MEM_BUF_ERROR;
}
static auto mbuf_deleter = [](Mbuf *buf) { (void)halMbufFree(buf); };
std::unique_ptr<Mbuf, decltype(mbuf_deleter)> mbuf_guard(feed_mbuf, mbuf_deleter);
int32_t hicard_ret = input_queue_wrappers_[input_idx]->Enqueue(feed_mbuf);
if (hicard_ret == HICAID_SUCCESS) {
(void)mbuf_guard.release();
return FLOW_FUNC_SUCCESS;
}
if (hicard_ret == HICAID_ERR_QUEUE_FULL) {
UDF_LOG_ERROR("Enqueue failed as queue full, input_idx=%zu, qid=%u", input_idx, input_queue_info.queue_id);
return FLOW_FUNC_FAILED;
}
UDF_LOG_ERROR("Enqueue failed, hicard_ret=%d, input_idx=%zu, qid=%u", hicard_ret, input_idx, input_queue_info.queue_id);
return FLOW_FUNC_ERR_QUEUE_ERROR;
}
int32_t FlowModelImpl::DequeueMbuf(size_t output_idx, Mbuf *&mbuf) const {
uint32_t qid = output_queue_infos_[output_idx].queue_id;
int32_t hicaid_ret = output_queue_wrappers_[output_idx]->Dequeue(mbuf);
if (hicaid_ret == HICAID_SUCCESS) {
UDF_LOG_DEBUG("dequeue for queue[%u] success", qid);
return FLOW_FUNC_SUCCESS;
} else if (hicaid_ret == HICAID_ERR_QUEUE_EMPTY) {
UDF_LOG_DEBUG("queue is empty, queue_id=%u", qid);
return FLOW_FUNC_ERR_QUEUE_EMPTY;
} else {
UDF_LOG_ERROR("Dequeue failed, hicaid_ret=%d, queue_id=%u", hicaid_ret, qid);
return FLOW_FUNC_ERR_QUEUE_ERROR;
}
}
int32_t FlowModelImpl::WaitAndHandleEvent(bool &is_continue) const {
is_continue = false;
struct event_info event = {};
handle_event_ = false;
drvError_t dev_ret = halEschedWaitEvent(GlobalConfig::Instance().GetDeviceId(), invoke_model_sched_group_id_,
curr_sched_thread_id_, kDequeWaitPerLoop, &event);
if (dev_ret == DRV_ERROR_NONE) {
handle_event_ = true;
GlobalConfig::Instance().SetCurrentSchedGroupId(invoke_model_sched_group_id_);
UDF_LOG_DEBUG("receive event, event_id=%d, subevent_id=%u ",
static_cast<int32_t>(event.comm.event_id), event.comm.subevent_id);
if (event.comm.event_id == UdfEvent::kEventIdWakeUp) {
is_continue = true;
return FLOW_FUNC_SUCCESS;
}
} else if (dev_ret == DRV_ERROR_SCHED_WAIT_TIMEOUT) {
is_continue = true;
return FLOW_FUNC_ERR_DRV_ERROR;
} else {
UDF_LOG_ERROR("wait event failed, device_id=%u, threadIndex=%u, groupId=%u, dev_ret=%d.",
GlobalConfig::Instance().GetDeviceId(), curr_sched_thread_id_, invoke_model_sched_group_id_,
static_cast<int32_t>(dev_ret));
return FLOW_FUNC_ERR_DRV_ERROR;
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowModelImpl::DequeueMbuf(size_t output_idx, Mbuf *&mbuf, int32_t timeout) {
const auto &output_queue_info = output_queue_infos_[output_idx];
uint32_t qid = output_queue_info.queue_id;
int32_t ret = FLOW_FUNC_SUCCESS;
if (timeout == 0) {
ret = DequeueMbuf(output_idx, mbuf);
if (ret == FLOW_FUNC_ERR_QUEUE_EMPTY) {
UDF_LOG_ERROR("Dequeue failed as queue empty and timeout=0, queue_id=%u", qid);
ret = FLOW_FUNC_FAILED;
}
return ret;
}
ret = SubQueueEvent(output_queue_info, QUEUE_ENQUE_EVENT);
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("sub queue enqueue event failed, queue_id=%u", qid);
return ret;
}
std::function<void()> unsub_func = [this, &output_queue_info]() {
(void)UnsubQueueEvent(output_queue_info, QUEUE_ENQUE_EVENT);
{
std::unique_lock<std::mutex> guard(event_mt_);
can_send_event_ = false;
}
SwapOutInvokeModelEventGroup();
};
const ScopeGuard unsub_guard(unsub_func);
ret = DequeueMbuf(output_idx, mbuf);
if (ret != FLOW_FUNC_ERR_QUEUE_EMPTY) {
return ret;
}
SwapOutGlobalGroup();
if (!output_queue_info.is_proxy_queue) {
std::unique_lock<std::mutex> guard(event_mt_);
can_send_event_ = true;
}
bool is_continue = false;
int64_t wait_time = 0L;
do {
if (CheckException() != FLOW_FUNC_SUCCESS) {
return FLOW_FUNC_FAILED;
}
if (output_queue_info.is_proxy_queue) {
wait_time += kProxyQueueDefaultDequeueTimeout;
} else {
ret = WaitAndHandleEvent(is_continue);
if ((ret == FLOW_FUNC_ERR_DRV_ERROR) && is_continue) {
wait_time += kDequeWaitPerLoop;
continue;
} else if (ret == FLOW_FUNC_ERR_DRV_ERROR) {
return FLOW_FUNC_ERR_DRV_ERROR;
} else if (is_continue) {
continue;
}
}
ret = DequeueMbuf(output_idx, mbuf);
if (ret != FLOW_FUNC_ERR_QUEUE_EMPTY) {
return ret;
}
} while (((wait_time < timeout) || (timeout == -1)) && (!GlobalConfig::Instance().GetAbnormalStatus()) &&
(!GlobalConfig::Instance().GetExitFlag()));
if (GlobalConfig::Instance().GetAbnormalStatus()) {
UDF_LOG_ERROR("Stop dequeue result of now system status is abnormal. Wait to redeploy.");
return FLOW_FUNC_STATUS_REDEPLOYING;
}
UDF_LOG_ERROR("wait event timeout, wait_time=%ld, timeout=%d(ms), output_idx=%zu, inputQueueSize=%s, outputQueueSize=%s",
wait_time, timeout, output_idx,
ToString(GetQueueSize(input_queue_wrappers_)).c_str(), ToString(GetQueueSize(output_queue_wrappers_)).c_str());
return FLOW_FUNC_ERR_TIME_OUT_ERROR;
}
int32_t FlowModelImpl::ParseMbuf(size_t output_idx, Mbuf *&mbuf, std::shared_ptr<FlowMsg> &flow_msg) const {
uint32_t qid = output_queue_infos_[output_idx].queue_id;
if (mbuf == nullptr) {
UDF_LOG_ERROR("dequeue mbuf null, queue_id=%u", qid);
return FLOW_FUNC_ERR_QUEUE_ERROR;
}
static auto mbuf_deleter = [](Mbuf *buf) { (void)halMbufFree(buf); };
std::shared_ptr<Mbuf> mbuf_ptr(mbuf, mbuf_deleter);
auto mbuf_flow_msg = new(std::nothrow)MbufFlowMsg(mbuf_ptr);
if (mbuf_flow_msg == nullptr) {
UDF_LOG_ERROR("new MbufFlowMsg failed, queue_id=%u.", qid);
return FLOW_FUNC_FAILED;
}
flow_msg.reset(mbuf_flow_msg);
auto init_ret = mbuf_flow_msg->Init();
if (init_ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("data init failed, ret=%d, queue_id=%u.", init_ret, qid);
return init_ret;
}
UDF_LOG_DEBUG("output_idx=%zu, qid=%u, fetch msg=%s", output_idx, qid, mbuf_flow_msg->DebugString().c_str());
return FLOW_FUNC_SUCCESS;
}
int32_t FlowModelImpl::Fetch(size_t output_idx, std::shared_ptr<FlowMsg> &flow_msg, int32_t timeout) {
uint32_t qid = output_queue_infos_[output_idx].queue_id;
UDF_LOG_DEBUG("fetch flow msg, queue_id=%u, timeout=%d(ms).", qid, timeout);
Mbuf *mbuf = nullptr;
int32_t ret = DequeueMbuf(output_idx, mbuf, timeout);
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("dequeue mbuf failed, queue_id=%u, timeout=%d(ms), ret=%d", qid, timeout, ret);
return ret;
}
ret = ParseMbuf(output_idx, mbuf, flow_msg);
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("parse mbuf failed, queue_id=%u, ret=%d", qid, ret);
return ret;
}
return FLOW_FUNC_SUCCESS;
}
void FlowModelImpl::GetMsgs(std::vector<Mbuf *> &data, std::vector<std::shared_ptr<FlowMsg>> &flow_msg) const {
UDF_LOG_DEBUG("Set output data, output num=%zu.", data.size());
for (size_t idx = 0UL; idx < data.size(); ++idx) {
std::shared_ptr<FlowMsg> outputMsg;
int32_t ret = ParseMbuf(idx, data[idx], outputMsg);
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("parse mbuf failed, ret=%d, index=%zu", ret, idx);
return;
}
flow_msg.emplace_back(outputMsg);
}
return;
}
int32_t FlowModelImpl::AlignFetch(std::vector<std::shared_ptr<FlowMsg>> &output_msgs, int32_t timeout) {
UDF_LOG_DEBUG("AlignFetch, timeout=%d, output size %u(ms).", timeout, output_queue_infos_.size());
int32_t ret = FLOW_FUNC_SUCCESS;
std::vector<Mbuf *> tmp_data;
while (true) {
data_aligner_->TryTakeExpiredOrOverLimitData(tmp_data);
if (!tmp_data.empty()) {
UDF_LOG_ERROR("has not aligned data.");
return FLOW_FUNC_FAILED;
}
size_t next_idx = data_aligner_->SelectNextIndex();
Mbuf *tmp_buf = nullptr;
ret = DequeueMbuf(next_idx, tmp_buf, timeout);
if (ret != FLOW_FUNC_SUCCESS) {
return ret;
}
uint64_t trans_id = 0;
uint32_t data_label = 0;
ret = data_aligner_->GetTransIdAndDataLabel(tmp_buf, trans_id, data_label);
if (ret != HICAID_SUCCESS || trans_id != current_trans_id_) {
halMbufFree(tmp_buf);
UDF_LOG_ERROR("get trans_id error, idx=%zu, mbuf trans_id %llu, current trans_id %llu.",
next_idx, trans_id, current_trans_id_);
continue;
}
ret = data_aligner_->PushAndAlignData(next_idx, tmp_buf, tmp_data);
if (ret != HICAID_SUCCESS) {
halMbufFree(tmp_buf);
return FLOW_FUNC_FAILED;
}
if (!tmp_data.empty()) {
GetMsgs(tmp_data, output_msgs);
break;
}
}
return ret;
}
int32_t FlowModelImpl::SubQueueEvent(const QueueDevInfo &queue_info, QUEUE_EVENT_TYPE queue_event_type) {
if (queue_info.is_proxy_queue) {
return FLOW_FUNC_SUCCESS;
}
struct QueueSubPara queue_sub_para{};
queue_sub_para.devId = queue_info.device_id;
queue_sub_para.qid = queue_info.queue_id;
queue_sub_para.queType = QUEUE_TYPE_SINGLE;
queue_sub_para.eventType = queue_event_type;
queue_sub_para.groupId = invoke_model_sched_group_id_;
queue_sub_para.flag = QUEUE_SUB_FLAG_SPEC_THREAD;
queue_sub_para.threadId = GlobalConfig::Instance().GetCurrentSchedThreadIdx();
drvError_t dev_ret = halQueueSubEvent(&queue_sub_para);
if ((dev_ret != DRV_ERROR_NONE) && (dev_ret != DRV_ERROR_REPEATED_SUBSCRIBED)) {
UDF_LOG_ERROR("queue sub event failed, dev_ret=%d, queue_id=%u, queue_event_type=%d.",
static_cast<int32_t>(dev_ret), queue_info.queue_id, static_cast<int32_t>(queue_event_type));
return FLOW_FUNC_ERR_EVENT_ERROR;
}
curr_sched_thread_id_ = queue_sub_para.threadId;
return FLOW_FUNC_SUCCESS;
}
int32_t FlowModelImpl::UnsubQueueEvent(const QueueDevInfo &queue_info, QUEUE_EVENT_TYPE queue_event_type) const {
if (queue_info.is_proxy_queue) {
return FLOW_FUNC_SUCCESS;
}
struct QueueUnsubPara queue_unsub_para{};
queue_unsub_para.devId = GlobalConfig::Instance().GetDeviceId();
queue_unsub_para.qid = queue_info.queue_id;
queue_unsub_para.eventType = queue_event_type;
drvError_t dev_ret = halQueueUnsubEvent(&queue_unsub_para);
if (dev_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("queue unsub event failed, dev_ret=%d, queue_id=%u, queue_event_type=%d.",
static_cast<int32_t>(dev_ret), queue_info.queue_id, static_cast<int32_t>(queue_event_type));
return FLOW_FUNC_ERR_QUEUE_ERROR;
}
return FLOW_FUNC_SUCCESS;
}
void FlowModelImpl::SwapOutGlobalGroup() const {
if (!GlobalConfig::Instance().IsRunOnAiCpu()) {
return;
}
const uint32_t thread_idx = GlobalConfig::Instance().GetCurrentSchedThreadIdx();
const uint32_t current_sched_group_id = GlobalConfig::Instance().GetCurrentSchedGroupId();
if (current_sched_group_id != invoke_model_sched_group_id_) {
auto dev_ret = halEschedThreadSwapout(GlobalConfig::Instance().GetDeviceId(), current_sched_group_id, thread_idx);
if (dev_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("halEschedThreadSwapout failed, groupId=%u, thread_idx=%u, dev_ret=%d",
current_sched_group_id, thread_idx, static_cast<int32_t>(dev_ret));
}
}
}
int32_t FlowModelImpl::CheckException() {
if (data_aligner_ == nullptr) {
return FLOW_FUNC_SUCCESS;
}
uint64_t trans_id = 0;
std::lock_guard<std::mutex> guard(exception_mt_);
while (!exception_wait_report_.empty()) {
trans_id = *(exception_wait_report_.cbegin());
if (trans_id == current_trans_id_) {
UDF_LOG_ERROR("Raise exception by user, trans_id=%lu.", trans_id);
(void)exception_wait_report_.erase(trans_id);
return FLOW_FUNC_FAILED;
} else if (trans_id < current_trans_id_) {
UDF_LOG_INFO("Raise exception that has been fetched, trans_id=%lu.", trans_id);
(void)exception_wait_report_.erase(trans_id);
} else {
break;
}
}
return FLOW_FUNC_SUCCESS;
}
void FlowModelImpl::SwapOutInvokeModelEventGroup() const {
if (!GlobalConfig::Instance().IsRunOnAiCpu()) {
return;
}
if (!handle_event_) {
return;
}
handle_event_ = false;
const uint32_t thread_idx = GlobalConfig::Instance().GetCurrentSchedThreadIdx();
auto dev_ret = halEschedThreadSwapout(GlobalConfig::Instance().GetDeviceId(), invoke_model_sched_group_id_, thread_idx);
if (dev_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("halEschedThreadSwapout failed, groupId=%u, thread_idx=%u, dev_ret=%d",
invoke_model_sched_group_id_, thread_idx, static_cast<int32_t>(dev_ret));
}
}
void FlowModelImpl::DeleteExceptionTransId(uint64_t trans_id) {
UDF_LOG_DEBUG("DeleteExceptionTransId, trans_id=%lu.", trans_id);
if (data_aligner_ != nullptr) {
data_aligner_->DeleteExceptionTransId(trans_id);
}
}
void FlowModelImpl::AddExceptionTransId(uint64_t trans_id) {
UDF_LOG_DEBUG("AddExceptionTransId, trans_id=%lu.", trans_id);
if (trans_id != current_trans_id_) {
return;
}
{
std::lock_guard<std::mutex> guard(exception_mt_);
exception_wait_report_.emplace(trans_id);
}
if (data_aligner_ != nullptr) {
data_aligner_->AddExceptionTransId(trans_id);
}
std::unique_lock<std::mutex> guard(event_mt_);
if (!can_send_event_) {
return;
}
event_summary event_info_summary = {};
event_info_summary.pid = getpid();
event_info_summary.event_id = static_cast<EVENT_ID>(UdfEvent::kEventIdWakeUp);
event_info_summary.subevent_id = 0U;
event_info_summary.msg = nullptr;
event_info_summary.msg_len = 0U;
event_info_summary.dst_engine = GlobalConfig::Instance().IsRunOnAiCpu() ? ACPU_LOCAL : CCPU_LOCAL;
event_info_summary.grp_id = invoke_model_sched_group_id_;
event_info_summary.tid = curr_sched_thread_id_;
drvError_t ret = halEschedSubmitEventToThread(GlobalConfig::Instance().GetDeviceId(), &event_info_summary);
if (ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("Failed to submit event, dev_ret=%d, groupId=%u, threadId=%u.",
static_cast<int32_t>(ret), invoke_model_sched_group_id_, curr_sched_thread_id_);
return;
}
}
}