* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "flow_func_executor.h"
#include <numeric>
#include "securec.h"
#include "common/common_define.h"
#include "common/inner_error_codes.h"
#include "common/udf_log.h"
#include "common/util.h"
#include "common/scope_guard.h"
#include "config/global_config.h"
#include "flow_func/flow_func_timer.h"
#include "flow_func/logger/flow_func_logger_manager.h"
#include "flow_func/flow_func_manager.h"
#include "flow_model_impl.h"
#include "flow_func_drv_manager.h"
#include "memory_statistic_manager.h"
namespace FlowFunc {
namespace {
constexpr int32_t kAttachWaitTimeout = 10 * 1000;
constexpr int32_t kAsyncExecutorThreadNum = 1;
constexpr uint64_t kWaitEventMask =
(1ULL << UdfEvent::kEventIdProcessorInit) |
(1ULL << UdfEvent::kEventIdFlowFuncInit) |
(1ULL << static_cast<uint32_t>(EVENT_QUEUE_EMPTY_TO_NOT_EMPTY)) |
(1ULL << static_cast<uint32_t>(EVENT_QUEUE_FULL_TO_NOT_FULL)) |
(1ULL << UdfEvent::kEventIdTimer) |
(1ULL << UdfEvent::kEventIdFlowFuncReportStatus) |
(1ULL << UdfEvent::kEventIdNotifyThreadExit) |
(1ULL << UdfEvent::kEventIdFlowFuncSuspendFinished) |
(1ULL << UdfEvent::kEventIdFlowFuncRecoverFinished) |
(1ULL << UdfEvent::kEventIdSwitchToSoftSchedMode) |
(1ULL << UdfEvent::kEventIdRaiseException) |
(1ULL << UdfEvent::kEventIdSingleFlowFuncInit);
constexpr uint64_t kWorkerWaitEventMask =
(1ULL << UdfEvent::kEventIdFlowFuncExecute) |
(1ULL << UdfEvent::kEventIdNotifyThreadExit);
constexpr uint64_t kInvokeModelWaitEventMask =
(1ULL << static_cast<uint32_t>(EVENT_QUEUE_EMPTY_TO_NOT_EMPTY)) |
(1ULL << static_cast<uint32_t>(EVENT_QUEUE_FULL_TO_NOT_FULL)) |
(1ULL << UdfEvent::kEventIdWakeUp);
constexpr uint64_t kFlowMsgQueueWaitEventMask =
(1ULL << static_cast<uint32_t>(EVENT_QUEUE_EMPTY_TO_NOT_EMPTY));
const std::map<ControlMessageType, std::string> kResponseMsgMap = {
{ControlMessageType::kInit, "Execute init"},
{ControlMessageType::kSuspend, "Execute suspend"},
{ControlMessageType::kRecover, "Execute recover"},
{ControlMessageType::kException, "Execute exception message"},
{ControlMessageType::kUnknow, "Parse control message"},
};
}
FlowFuncExecutor::FlowFuncExecutor() : udf_thread_pool_("udf_worker_"),
event_proc_func_map_(
{{EVENT_QUEUE_EMPTY_TO_NOT_EMPTY, &FlowFuncExecutor::ProcessEmptyToNotEmptyEvent},
{EVENT_QUEUE_FULL_TO_NOT_FULL, &FlowFuncExecutor::ProcessFullToNoFullEvent},
{UdfEvent::kEventIdProcessorInit, &FlowFuncExecutor::ProcessProcessorInitEvent},
{UdfEvent::kEventIdFlowFuncInit, &FlowFuncExecutor::ProcessFlowFuncInitEvent},
{UdfEvent::kEventIdFlowFuncExecute, &FlowFuncExecutor::ProcessFlowFuncExecuteEvent},
{UdfEvent::kEventIdTimer, &FlowFuncExecutor::ProcessTimerEvent},
{UdfEvent::kEventIdFlowFuncReportStatus, &FlowFuncExecutor::ProcessReportStatusEvent},
{UdfEvent::kEventIdNotifyThreadExit, &FlowFuncExecutor::ProcessNotifyThreadExitEvent},
{UdfEvent::kEventIdFlowFuncSuspendFinished, &FlowFuncExecutor::ProcessReportSuspendEvent},
{UdfEvent::kEventIdFlowFuncRecoverFinished, &FlowFuncExecutor::ProcessReportRecoverEvent},
{UdfEvent::kEventIdSwitchToSoftSchedMode, &FlowFuncExecutor::ProcessSwitchSoftModeEvent},
{UdfEvent::kEventIdRaiseException, &FlowFuncExecutor::ProcessRaiseExceptionEvent},
{UdfEvent::kEventIdSingleFlowFuncInit, &FlowFuncExecutor::ProcessSingleFlowFuncInitEvent}}),
esched_process_priority_(kUserUnsetESchedPriority),
esched_event_priority_(kUserUnsetESchedPriority) {}
void FlowFuncExecutor::UpdatePriority(int32_t user_priority, int32_t &priority) const {
if (user_priority != kUserUnsetESchedPriority) {
if ((priority == kUserUnsetESchedPriority) || (user_priority < priority)) {
priority = user_priority;
}
}
}
int32_t FlowFuncExecutor::CreateFlowFuncProcessor(const FlowFuncModel &model,
const std::map<std::string, std::vector<QueueDevInfo>> &input_maps,
const std::map<std::string, std::vector<uint32_t>> &output_index_maps,
const std::vector<QueueDevInfo> &all_outputs_queue_infos,
const std::shared_ptr<FlowFuncParams> ¶ms) {
const std::string &instance_name = model.GetModelInstanceName();
if (input_maps.empty()) {
UDF_LOG_ERROR("func input index size cannot be zero, instance name[%s]", instance_name.c_str());
return FLOW_FUNC_FAILED;
}
for (const auto &input_map : input_maps) {
std::vector<uint32_t> output_indexes;
auto output_iter = output_index_maps.find(input_map.first);
if (output_iter == output_index_maps.end()) {
UDF_LOG_INFO("multi func has no output maps, func name=%s.", input_map.first.c_str());
output_indexes.resize(all_outputs_queue_infos.size());
std::iota(output_indexes.begin(), output_indexes.end(), 0);
} else {
output_indexes = output_iter->second;
}
std::shared_ptr<FlowFuncProcessor> flow_func_processor(
new(std::nothrow) FlowFuncProcessor(params, input_map.first, input_map.second,
all_outputs_queue_infos, output_indexes, async_executor_));
if (flow_func_processor == nullptr) {
UDF_LOG_ERROR("alloc FlowFuncProcessor failed, flow_func_name=%s", input_map.first.c_str());
return FLOW_FUNC_FAILED;
}
flow_func_processor->SetProcessorIdx(static_cast<uint32_t>(func_processors_.size()));
const auto &stream_input_func_names = params->GetStreamInputFuncNames();
bool is_stream_input = (stream_input_func_names.count(input_map.first) > 0);
for (const auto &input_queue_info : input_map.second) {
if (is_stream_input) {
(void)flow_msg_queues_.insert(input_queue_info.queue_id);
}
dev_input_queue_map_[input_queue_info.device_id].emplace(input_queue_info.queue_id, input_queue_info.is_proxy_queue);
queue_dev_set_.emplace(input_queue_info.device_id);
if (input_queue_info.is_proxy_queue) {
with_proxy_queue_ = true;
continue;
}
if (is_stream_input) {
continue;
}
std::map<uint32_t, size_t>::const_iterator context_iter = input_to_flow_func_processor_idx_.find(
input_queue_info.queue_id);
if (context_iter != input_to_flow_func_processor_idx_.cend()) {
UDF_LOG_ERROR("flow_func_name[%s]'s input_queue_id=%u is used by flow func processor %zu",
input_map.first.c_str(),
input_queue_info.queue_id,
context_iter->second);
return FLOW_FUNC_FAILED;
}
input_to_flow_func_processor_idx_[input_queue_info.queue_id] = func_processors_.size();
}
std::vector<QueueDevInfo> outputs_queue_infos;
outputs_queue_infos.reserve(output_indexes.size());
for (const auto output_index : output_indexes) {
const auto &output_queue_info = all_outputs_queue_infos[output_index];
outputs_queue_infos.emplace_back(output_queue_info);
if (output_queue_info.queue_id == Common::kDummyQId) {
continue;
}
queue_dev_set_.emplace(output_queue_info.device_id);
dev_output_queue_map_[output_queue_info.device_id].emplace(output_queue_info.queue_id, output_queue_info.is_proxy_queue);
if (output_queue_info.is_proxy_queue) {
with_proxy_queue_ = true;
continue;
}
output_to_flow_func_processor_idx_[output_queue_info.queue_id].emplace_back(func_processors_.size());
}
const auto &input_align_attrs = model.GetInputAlignAttrs();
flow_func_processor->SetInputAlignAttrs(input_align_attrs.align_max_cache_num, input_align_attrs.align_timeout,
input_align_attrs.drop_when_not_align);
func_processors_.emplace_back(flow_func_processor);
UDF_RUN_LOG_INFO("Flow func[instance_name:%s, funcName:%s] IO info: inputQs=%s, outputQs=%s.",
instance_name.c_str(),
input_map.first.c_str(),
ToString(input_map.second).c_str(),
ToString(outputs_queue_infos).c_str());
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowFuncExecutor::CheckInputOutputMapsValid(const std::map<std::string, std::vector<uint32_t>> &input_maps,
uint32_t inputs_num, const std::map<std::string, std::vector<uint32_t>> &output_maps, uint32_t outputs_num) {
for (auto iter = input_maps.begin(); iter != input_maps.end(); iter++) {
for (auto index : iter->second) {
if (index >= inputs_num) {
UDF_LOG_ERROR("func name[%s]'s input map index[%u] is invalid, valid range is [0, %u).",
iter->first.c_str(),
index,
inputs_num);
return FLOW_FUNC_FAILED;
}
}
}
for (auto iter = output_maps.begin(); iter != output_maps.end(); iter++) {
for (auto index : iter->second) {
if (index >= outputs_num) {
UDF_LOG_ERROR("func name[%s]'s output map index[%u] is invalid, valid range is [0, %u).",
iter->first.c_str(),
index,
outputs_num);
return FLOW_FUNC_FAILED;
}
}
}
return FLOW_FUNC_SUCCESS;
}
void FlowFuncExecutor::GetRealQueueInfos(const std::string &flow_func_name,
const std::vector<QueueDevInfo> &input_queue_infos,
const std::map<std::string, std::vector<uint32_t>> &multi_func_input_maps,
std::map<std::string, std::vector<QueueDevInfo>> &real_input_queue_maps) {
if (multi_func_input_maps.empty()) {
real_input_queue_maps.emplace(flow_func_name, input_queue_infos);
} else {
for (auto &input_maps : multi_func_input_maps) {
std::vector<QueueDevInfo> queue_infos;
queue_infos.reserve(input_maps.second.size());
std::transform(input_maps.second.cbegin(), input_maps.second.cend(), std::back_inserter(queue_infos),
[&input_queue_infos](uint32_t index) {
return input_queue_infos[index];
});
real_input_queue_maps.emplace(input_maps.first, std::move(queue_infos));
}
}
}
int32_t FlowFuncExecutor::GetModelQueueInfos(const FlowFuncModel &model, ModelQueueInfos &model_queue_infos) {
if (GlobalConfig::Instance().IsNpuSched()) {
return npu_sched_processor_->LoadNpuSchedModel(model, model_queue_infos);
} else {
model_queue_infos.input_queues = model.GetInputQueues();
model_queue_infos.output_queues = model.GetOutputQueues();
model_queue_infos.func_input_maps = model.GetMultiFuncInputMaps();
model_queue_infos.func_output_maps = model.GetMultiFuncOutputMaps();
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowFuncExecutor::InitNpuSchedProcessor() {
if (GlobalConfig::Instance().IsNpuSched()) {
npu_sched_processor_ = MakeShared<NpuSchedProcessor>();
if (npu_sched_processor_ == nullptr) {
UDF_LOG_ERROR("failed to alloc npu sched processor.");
return FLOW_FUNC_FAILED;
}
int32_t init_ret = npu_sched_processor_->Initialize(GlobalConfig::Instance().GetRunningDeviceId());
if (init_ret != FLOW_FUNC_SUCCESS) {
(void)SendMessageByResponseQueue(ControlMessageType::kInit, init_ret);
return init_ret;
}
int32_t send_ret = SendMessageByResponseQueue(ControlMessageType::kInit, FLOW_FUNC_SUCCESS);
if (send_ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("failed to send init message.");
return send_ret;
}
Mbuf *control_muff = nullptr;
auto deque_ret = request_queue_wrapper_->DequeueWithTimeout(control_muff, 30 * 1000);
if ((deque_ret != HICAID_SUCCESS)) {
UDF_LOG_ERROR("Dequeue message and wait notify message failed, ret = %d.", deque_ret);
return FLOW_FUNC_FAILED;
}
auto mbuf_deleter = [control_muff]() { (void)halMbufFree(control_muff); };
ScopeGuard mbuf_guard(mbuf_deleter);
ff::deployer::ExecutorRequest req_msg;
RequestMsgType msg_type = RequestMsgType::kControlMsg;
auto parse_ret = ParseRequestMessage(control_muff, req_msg, msg_type);
if (parse_ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Parse notify failed, ret = %d.", parse_ret);
return parse_ret;
}
if (msg_type != RequestMsgType::kNotify) {
UDF_LOG_ERROR("expect notify message=%d but got %d message.", RequestMsgType::kNotify, msg_type);
return parse_ret;
}
UDF_RUN_LOG_INFO("receive deployer notify continue message.");
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowFuncExecutor::GetUsrInvokedModelKey(const std::string &name_with_scope, const std::string &scope,
std::string &invoke_name) {
if (scope.empty()) {
invoke_name = name_with_scope;
return FLOW_FUNC_SUCCESS;
}
if ((name_with_scope.size() >= scope.size()) && (name_with_scope.compare(0, scope.size(), scope) == 0)) {
invoke_name = name_with_scope.substr(scope.size());
return FLOW_FUNC_SUCCESS;
}
UDF_LOG_ERROR("Get the usr invoked model key failed, dataFlowScope=%s, invoked model key=%s.",
scope.c_str(), name_with_scope.c_str());
return FLOW_FUNC_FAILED;
}
int32_t FlowFuncExecutor::CreateFlowModel(const std::shared_ptr<FlowFuncParams> ¶ms, const FlowFuncModel &model) {
const auto &name = model.GetName();
const auto &flow_func_name = model.GetFlowFuncName();
const auto &invokedModelQueueInfos = model.GetInvokedModelQueueInfos();
const auto &input_align_attrs = model.GetInputAlignAttrs();
for (const auto &invoked_model_queue_info : invokedModelQueueInfos) {
std::string invoke_name;
if (GetUsrInvokedModelKey(invoked_model_queue_info.first, model.GetDataFlowScope(), invoke_name) != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("GetUsrInvokedModelKey failed, name=%s, flow_func_name=%s, invoked model key=%s.",
name.c_str(), flow_func_name.c_str(), invoked_model_queue_info.first.c_str());
return FLOW_FUNC_FAILED;
}
std::unique_ptr<DataAligner> dataAligner;
if (input_align_attrs.align_max_cache_num > 0) {
dataAligner.reset(new(std::nothrow) DataAligner(invoked_model_queue_info.second.fetch_queue_infos.size(),
input_align_attrs.align_max_cache_num,
input_align_attrs.align_timeout, input_align_attrs.drop_when_not_align));
if (dataAligner == nullptr) {
UDF_LOG_ERROR("alloc DataAligner failed, name=%s, flow_func_name=%s.", name.c_str(),
flow_func_name.c_str());
return FLOW_FUNC_FAILED;
}
}
std::unique_ptr<FlowModel> impl(
new(std::nothrow) FlowModelImpl(invoked_model_queue_info.second.feed_queue_infos,
invoked_model_queue_info.second.fetch_queue_infos, std::move(dataAligner)));
if (impl == nullptr) {
UDF_LOG_ERROR("alloc FlowModelImpl failed, name=%s, flow_func_name=%s, invoked model key=%s.",
name.c_str(), flow_func_name.c_str(), invoke_name.c_str());
return FLOW_FUNC_FAILED;
}
params->AddFlowModel(invoke_name, std::move(impl));
UDF_RUN_LOG_INFO("Flow func(ppName:%s) invoke model IO info: invoked key=%s, feedQs=%s, fetchQs=%s.",
name.c_str(), invoke_name.c_str(),
ToString(invoked_model_queue_info.second.feed_queue_infos).c_str(),
ToString(invoked_model_queue_info.second.fetch_queue_infos).c_str());
for (const auto &feed_queue_info : invoked_model_queue_info.second.feed_queue_infos) {
with_proxy_queue_ = with_proxy_queue_ || feed_queue_info.is_proxy_queue;
queue_dev_set_.emplace(feed_queue_info.device_id);
}
for (const auto &fetch_queue_info : invoked_model_queue_info.second.fetch_queue_infos) {
with_proxy_queue_ = with_proxy_queue_ || fetch_queue_info.is_proxy_queue;
queue_dev_set_.emplace(fetch_queue_info.device_id);
}
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowFuncExecutor::Init(const std::vector<std::unique_ptr<FlowFuncModel>> &models) {
UDF_RUN_LOG_INFO("Init start, model num=%zu.", models.size());
int32_t init_ret = InitMessageQueue();
if (init_ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Failed to init message queue.");
return init_ret;
}
init_ret = InitNpuSchedProcessor();
if (init_ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Failed to init npu sched processor.");
return init_ret;
}
async_executor_ = MakeShared<AsyncExecutor>("udf_async_", kAsyncExecutorThreadNum);
bool is_cpu_num_from_attr = false;
for (size_t model_idx = 0UL; model_idx < models.size(); ++model_idx) {
const auto &model = models[model_idx];
const auto &name = model->GetName();
const auto &instance_name = model->GetModelInstanceName();
const auto &flow_func_name = model->GetFlowFuncName();
ModelQueueInfos model_queue_infos = {};
if (GetModelQueueInfos(*model, model_queue_infos) != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("failed to get model[%s] queue infos.", instance_name.c_str());
return FLOW_FUNC_FAILED;
}
const auto &input_queue_infos = model_queue_infos.input_queues;
const auto &output_queue_infos = model_queue_infos.output_queues;
const auto &multi_func_input_maps = model_queue_infos.func_input_maps;
const auto &multi_func_output_maps = model_queue_infos.func_output_maps;
const auto &stream_input_func_names = model->GetStreamInputFuncNames();
if (CheckInputOutputMapsValid(multi_func_input_maps, input_queue_infos.size(), multi_func_output_maps,
output_queue_infos.size()) != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("instance name[%s]'s func inputs/outputs map is invalid.", instance_name.c_str());
return FLOW_FUNC_FAILED;
}
int32_t running_device_id = 0;
if (GlobalConfig::Instance().IsOnDevice()) {
running_device_id = static_cast<int32_t>(GlobalConfig::Instance().GetDeviceId());
} else {
running_device_id = GlobalConfig::Instance().GetRunningDeviceId();
}
std::shared_ptr<FlowFuncParams> params = MakeShared<FlowFuncParams>(name, input_queue_infos.size(),
output_queue_infos.size(), running_device_id, GlobalConfig::Instance().GetDeviceId());
if (params == nullptr) {
UDF_LOG_ERROR(
"Failed to create params, name[%s], output queue size[%zu].", name.c_str(), output_queue_infos.size());
return FLOW_FUNC_FAILED;
}
params->SetLibPath(model->GetLibPath());
params->SetAttrMap(model->GetNodeAttrMap());
params->SetWorkPath(model->GetWorkPath());
params->SetStreamInputFuncNames(stream_input_func_names);
params->SetModelUuid(model->GetModelUuid());
params->SetHeadInfo(model->IsHead());
params->SetInstanceName(instance_name);
params->SetScope(model->GetScope());
params->SetRunningInstanceId(model->GetReplicaIdx());
params->SetRunningInstanceNum(model->GetReplicaNum());
params->SetInvokedScopes(model->GetInvokedScopes());
if (!GlobalConfig::Instance().IsNpuSched()) {
const QueueDevInfo &status_output_queue = model->GetStatusOutputQueue();
params->SetStatusOutputQueue(status_output_queue);
const bool need_report_status = model->NeedReportStatus();
params->SetNeedReportStatusFlag(need_report_status);
params->SetEnableRaiseException(model->GetEnableRaiseException());
if (need_report_status || model->GetEnableRaiseException()) {
status_output_queue_map_[status_output_queue.device_id].emplace_back(status_output_queue.queue_id);
with_proxy_queue_ = with_proxy_queue_ || status_output_queue.is_proxy_queue;
queue_dev_set_.emplace(status_output_queue.device_id);
}
}
if (CreateFlowModel(params, *model) != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("create flow model failed, name=%s, flow_func_name=%s.", name.c_str(), flow_func_name.c_str());
return FLOW_FUNC_FAILED;
}
std::map<std::string, std::vector<QueueDevInfo>> real_input_queue_maps;
GetRealQueueInfos(flow_func_name, input_queue_infos, multi_func_input_maps, real_input_queue_maps);
if (CreateFlowFuncProcessor(*model, real_input_queue_maps, multi_func_output_maps, output_queue_infos, params) !=
FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR(
"create multi func processor failed, name=%s, flow_func_name=%s", name.c_str(), flow_func_name.c_str());
return FLOW_FUNC_FAILED;
}
func_params_.emplace_back(params);
UpdatePriority(model->GetModelEschedProcessPriority(), esched_process_priority_);
UpdatePriority(model->GetModelEschedEventPriority(), esched_event_priority_);
if (!GlobalConfig::Instance().IsOnDevice()) {
uint32_t cpu_num = 1U;
bool is_attr_get = false;
if (model->GetCpuNumFromAttr(cpu_num, is_attr_get) != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("get cpu num from attr fail, name=%s, flow_func_name=%s.",
name.c_str(), flow_func_name.c_str());
return FLOW_FUNC_FAILED;
}
if (is_attr_get) {
is_cpu_num_from_attr = true;
cpu_num_ = (cpu_num > cpu_num_ ? cpu_num : cpu_num_) + 1U;
}
}
UDF_RUN_LOG_INFO("[UdfModelEschedPriority]flow func init end, name=%s, flow_func_name=%s, idx=%zu, "
"process priority=%d, event priority=%d.", name.c_str(), flow_func_name.c_str(), model_idx,
esched_process_priority_, esched_event_priority_);
}
if (!is_cpu_num_from_attr) {
cpu_num_ = static_cast<uint32_t>(func_processors_.size() + 1U);
}
UDF_LOG_INFO("cpu num is %u.", cpu_num_);
const auto drv_ret = InitDrv();
if (drv_ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Init driver failed.");
return drv_ret;
}
return FLOW_FUNC_SUCCESS;
}
void FlowFuncExecutor::Destroy() {
UnsubscribeInputQueue();
UnsubscribeOutputQueue();
input_to_flow_func_processor_idx_.clear();
output_to_flow_func_processor_idx_.clear();
func_processors_.clear();
if (npu_sched_processor_ != nullptr) {
npu_sched_processor_->Finalize();
}
FlowFuncManager::Instance().Reset();
}
int32_t FlowFuncExecutor::SetExecutorEschedPriority() const {
uint32_t device_id = GlobalConfig::Instance().GetDeviceId();
if (esched_process_priority_ != kUserUnsetESchedPriority) {
const drvError_t drv_ret = halEschedSetPidPriority(device_id,
static_cast<SCHEDULE_PRIORITY>(esched_process_priority_));
if (drv_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("Failed to set pid priority, device_id=%u, priority=%d, drv_ret=%d.",
device_id, esched_process_priority_, static_cast<int32_t>(drv_ret));
return FLOW_FUNC_ERR_DRV_ERROR;
}
UDF_LOG_INFO("[UdfModelEschedPriority] Succeed to set eshced process priority=%d.",
esched_process_priority_);
}
if (esched_event_priority_ != kUserUnsetESchedPriority) {
auto drv_ret = halEschedSetEventPriority(device_id, EVENT_QUEUE_EMPTY_TO_NOT_EMPTY,
static_cast<SCHEDULE_PRIORITY>(esched_event_priority_));
if (drv_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("Failed to set event[%d] priority, device_id=%u, priority=%d, drv_ret=%d.",
static_cast<int32_t>(EVENT_QUEUE_EMPTY_TO_NOT_EMPTY), device_id, esched_event_priority_,
static_cast<int32_t>(drv_ret));
return FLOW_FUNC_ERR_DRV_ERROR;
}
drv_ret = halEschedSetEventPriority(device_id, EVENT_QUEUE_FULL_TO_NOT_FULL,
static_cast<SCHEDULE_PRIORITY>(esched_event_priority_));
if (drv_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("Failed to set event[%d] priority, device_id=%u, priority=%d, drv_ret=%d.",
static_cast<int32_t>(EVENT_QUEUE_FULL_TO_NOT_FULL), device_id, esched_event_priority_,
static_cast<int32_t>(drv_ret));
return FLOW_FUNC_ERR_DRV_ERROR;
}
UDF_LOG_INFO("[UdfModelEschedPriority] Succeed to set eshced event priority=%d.",
esched_event_priority_);
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowFuncExecutor::InitQueue() const {
if (with_proxy_queue_) {
constexpr uint32_t kWaitTimeout = 60 * 1000;
(void)FlowFuncDrvManager::Instance().WaitBindHostPid(kWaitTimeout);
}
for (uint32_t queue_device_id : queue_dev_set_) {
drvError_t drv_ret = halQueueInit(queue_device_id);
if ((drv_ret != DRV_ERROR_NONE) && (drv_ret != DRV_ERROR_REPEATED_INIT)) {
UDF_LOG_ERROR("halQueueInit error, queue_device_id=%u, drv_ret=%d", queue_device_id,
static_cast<int32_t>(drv_ret));
return FLOW_FUNC_ERR_QUEUE_ERROR;
}
UDF_LOG_INFO("halQueueInit success, queue_device_id=%u", queue_device_id);
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowFuncExecutor::InitDrv() const {
UDF_RUN_LOG_INFO("ready to init drv, withProxyQueue=%d", static_cast<int32_t>(with_proxy_queue_));
int32_t queue_init_ret = InitQueue();
if (queue_init_ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("init queue failed, ret=%d.", queue_init_ret);
return queue_init_ret;
}
auto ret = SetExecutorEschedPriority();
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Failed to set esched priority, ret=%d.", ret);
return ret;
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowFuncExecutor::InitMessageQueue() {
const uint32_t device_id = GlobalConfig::Instance().GetDeviceId();
const uint32_t req_queue_id = GlobalConfig::Instance().GetReqQueueId();
const uint32_t rsp_queue_id = GlobalConfig::Instance().GetRspQueueId();
if ((req_queue_id == UINT32_MAX) || (rsp_queue_id == UINT32_MAX)) {
UDF_LOG_INFO("Message queues are not created.");
return FLOW_FUNC_SUCCESS;
}
try {
request_queue_wrapper_.reset(new(std::nothrow) QueueWrapper(device_id, req_queue_id));
response_queue_wrapper_.reset(new(std::nothrow) QueueWrapper(device_id, rsp_queue_id));
if ((request_queue_wrapper_ == nullptr) || (response_queue_wrapper_ == nullptr)) {
UDF_LOG_ERROR("Make request/response queue wrapper failed.");
return FLOW_FUNC_FAILED;
}
} catch (std::exception &e) {
UDF_LOG_ERROR("Init message queue failed, error=%s.", e.what());
return FLOW_FUNC_FAILED;
}
auto drv_ret = halQueueInit(device_id);
if ((drv_ret != DRV_ERROR_NONE) && (drv_ret != DRV_ERROR_REPEATED_INIT)) {
UDF_LOG_ERROR("halQueueInit error, device_id=%u, drv_ret=%d", device_id, static_cast<int32_t>(drv_ret));
return FLOW_FUNC_ERR_QUEUE_ERROR;
}
UDF_LOG_INFO("halQueueInit success, queue_device_id=%u", device_id);
drv_ret = halQueueAttach(device_id, req_queue_id, kAttachWaitTimeout);
if (drv_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("attached request queue[%u] failed, ret[%d], queue_device_id[%u]", req_queue_id,
static_cast<int32_t>(drv_ret), device_id);
return FLOW_FUNC_ERR_QUEUE_ERROR;
}
drv_ret = halQueueAttach(device_id, rsp_queue_id, kAttachWaitTimeout);
if (drv_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("attached response queue[%u] failed, ret[%d], queue_device_id[%u]", rsp_queue_id,
static_cast<int32_t>(drv_ret), device_id);
return FLOW_FUNC_ERR_QUEUE_ERROR;
}
UDF_LOG_INFO("Attach message %u %u success.", req_queue_id, rsp_queue_id);
QueueSetInputPara input_param;
QueueSetInput input;
input.queSetWorkMode.qid = req_queue_id;
input.queSetWorkMode.workMode = static_cast<uint32_t>(QUEUE_MODE_PULL);
input_param.inBuff = static_cast<void *>(&input);
input_param.inLen = static_cast<uint32_t>(sizeof(QueueSetInput));
(void)halQueueSet(device_id, QUEUE_SET_WORK_MODE, &input_param);
drv_ret = halQueueSubscribe(device_id, req_queue_id, GlobalConfig::Instance().GetMainSchedGroupId(),
static_cast<int32_t>(QUEUE_TYPE_SINGLE));
if (drv_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("Failed to subscribe event for queue[%u], ret[%d].", req_queue_id, static_cast<int32_t>(drv_ret));
return FLOW_FUNC_ERR_QUEUE_ERROR;
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowFuncExecutor::SendSwitchSoftModeEvent() const {
event_summary event_info_summary = {};
char msg_value[8] = {};
event_info_summary.pid = getpid();
event_info_summary.event_id = static_cast<EVENT_ID>(UdfEvent::kEventIdSwitchToSoftSchedMode);
event_info_summary.subevent_id = 0U;
event_info_summary.msg = msg_value;
event_info_summary.msg_len = static_cast<uint32_t>(sizeof(msg_value));
event_info_summary.dst_engine = GlobalConfig::Instance().IsRunOnAiCpu() ? ACPU_LOCAL : CCPU_LOCAL;
event_info_summary.grp_id = GlobalConfig::Instance().GetMainSchedGroupId();
event_info_summary.tid = 0U;
drvError_t ret = halEschedSubmitEventToThread(GlobalConfig::Instance().GetDeviceId(), &event_info_summary);
if (ret != DRV_ERROR_NONE) {
UDF_RUN_LOG_INFO("Submit switch soft sched mode get ret=%d.", static_cast<int32_t>(ret));
return FLOW_FUNC_ERR_DRV_ERROR;
}
UDF_RUN_LOG_INFO("Submit switch soft sched mode succ.");
return FLOW_FUNC_SUCCESS;
}
int32_t FlowFuncExecutor::SendProcessorInitEvent() const {
event_summary event_info_summary = {};
event_info_summary.pid = getpid();
event_info_summary.event_id = static_cast<EVENT_ID>(UdfEvent::kEventIdProcessorInit);
event_info_summary.subevent_id = 0U;
event_info_summary.msg = nullptr;
event_info_summary.msg_len = 0U;
event_info_summary.dst_engine = GlobalConfig::Instance().IsRunOnAiCpu() ? ACPU_LOCAL : CCPU_LOCAL;
event_info_summary.grp_id = GlobalConfig::Instance().GetMainSchedGroupId();
drvError_t ret = DRV_ERROR_NONE;
const int32_t submit_event_retry_num = 3;
int32_t retry_num = 0;
do {
ret = halEschedSubmitEvent(GlobalConfig::Instance().GetDeviceId(), &event_info_summary);
if (ret != DRV_ERROR_NONE) {
retry_num++;
UDF_RUN_LOG_INFO("Submitting flow func processor to init will retry 100ms later, current retry num %d.",
retry_num);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue;
}
UDF_RUN_LOG_INFO("Submit flow func processor init event success.");
return FLOW_FUNC_SUCCESS;
} while (retry_num < submit_event_retry_num);
UDF_LOG_ERROR("Failed to submit flow func processor init event. drv_ret=%d.", static_cast<int32_t>(ret));
return FLOW_FUNC_ERR_DRV_ERROR;
}
int32_t FlowFuncExecutor::Start() {
running_ = true;
auto ret = udf_thread_pool_.Init(
[this](uint32_t thread_idx) { ThreadLoop(thread_idx); }, cpu_num_);
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("thread pool init failed, ret=%d.", ret);
Stop();
return ret;
}
auto worker_num =
GlobalConfig::Instance().IsRunOnAiCpu() ? udf_thread_pool_.GetThreadNum() : (udf_thread_pool_.GetThreadNum() - 1);
GlobalConfig::Instance().SetWorkerNum(worker_num);
udf_thread_pool_.WaitAllThreadReady();
if (!running_) {
UDF_LOG_ERROR("thread start failed.");
return FLOW_FUNC_FAILED;
}
ret = FlowFuncManager::Instance().Init();
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Flow func manager init failed, ret=%d.", ret);
Stop();
return ret;
}
FlowFuncTimer::Instance().Init(GlobalConfig::Instance().GetDeviceId());
ret = FlowFuncLoggerManager::Instance().Init();
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Flow func logger manager init failed, ret=%d.", ret);
Stop();
return ret;
}
ret = SendProcessorInitEvent();
if (ret != FLOW_FUNC_SUCCESS) {
Stop();
return ret;
}
(void)SendSwitchSoftModeEvent();
statistic_timer_handle_ = FlowFuncTimer::Instance().CreateTimer([this]() {
DumpMetrics();
});
constexpr uint32_t kStatisticTimerPeriod = 80 * 1000;
(void)FlowFuncTimer::Instance().StartTimer(statistic_timer_handle_, kStatisticTimerPeriod, false);
if (GlobalConfig::Instance().IsOnDevice()) {
MonitorParentExit();
}
MonitorTermSignal();
MemoryStatisticManager::Instance().Init(GlobalConfig::Instance().GetMemGroupName());
return FLOW_FUNC_SUCCESS;
}
void FlowFuncExecutor::MonitorParentExit() {
auto start_parent_pid = getppid();
const auto monitor_func = [this, start_parent_pid]() {
auto current_parent_pid = getppid();
if ((current_parent_pid != start_parent_pid) && (start_parent_pid != 1)) {
static int32_t stop_times = 0;
if (stop_times < 5) {
UDF_RUN_LOG_INFO("parent pid[%d] exit, current_parent_pid=%d, udf will stop, times=%d.", start_parent_pid,
current_parent_pid, stop_times);
Stop();
++stop_times;
} else {
UDF_RUN_LOG_INFO("parent pid[%d] exit, but udf cannot stop normally, so kill itself.", start_parent_pid);
(void)kill(getpid(), SIGKILL);
}
}
};
monitor_parent_timer_handle_ = FlowFuncTimer::Instance().CreateTimer(monitor_func, false);
constexpr uint32_t monitor_parent_timer_period = 1 * 1000;
(void)FlowFuncTimer::Instance().StartTimer(monitor_parent_timer_handle_, monitor_parent_timer_period, false);
}
void FlowFuncExecutor::MonitorTermSignal() {
const auto monitor_func = [this]() {
if (!recv_term_signal_) {
return;
}
recv_term_signal_ = false;
event_summary event_info_summary = {};
event_info_summary.pid = getpid();
event_info_summary.event_id = static_cast<EVENT_ID>(UdfEvent::kEventIdNotifyThreadExit);
event_info_summary.subevent_id = 0;
event_info_summary.msg = nullptr;
event_info_summary.msg_len = 0U;
event_info_summary.dst_engine = GlobalConfig::Instance().IsRunOnAiCpu() ? ACPU_LOCAL : CCPU_LOCAL;
uint32_t thread_num = udf_thread_pool_.GetThreadNum();
for (uint32_t i = 0; i < thread_num; ++i) {
uint32_t main_sched_group_id = GlobalConfig::Instance().GetMainSchedGroupId();
uint32_t worker_sched_group_id = GlobalConfig::Instance().GetWorkerSchedGroupId();
event_info_summary.grp_id = (i == (thread_num - 1) ? main_sched_group_id : worker_sched_group_id);
drvError_t ret = halEschedSubmitEvent(GlobalConfig::Instance().GetDeviceId(), &event_info_summary);
if (ret != DRV_ERROR_NONE) {
UDF_LOG_WARN("Failed to submit notify thread exit event, drv_ret=%d.", static_cast<int32_t>(ret));
}
}
UDF_RUN_LOG_INFO("receive term signal, notify all thread exit end, thread_num=%u.", thread_num);
};
monitor_term_signal_timer_handle_ = FlowFuncTimer::Instance().CreateTimer(monitor_func, false);
constexpr uint32_t kMonitorTermSignalTimerPeriod = 10;
(void)FlowFuncTimer::Instance().StartTimer(monitor_term_signal_timer_handle_, kMonitorTermSignalTimerPeriod, false);
}
void FlowFuncExecutor::CheckReplenishSchedule() {
for (size_t idx = 0UL; idx < func_processors_.size(); ++idx) {
if (func_processors_[idx]->NeedReplenishSchedule()) {
UDF_LOG_INFO("processor[%s] need replenish schedule event",
func_processors_[idx]->GetFlowFuncInfo().c_str());
(void)ScheduleFlowFunc(idx);
}
}
}
int32_t FlowFuncExecutor::ScheduleFlowFunc(size_t flow_func_processor_idx) const {
UDF_LOG_DEBUG("schedule flow func, flow_func_processor_idx=%zu.", flow_func_processor_idx);
int32_t ret = SubmitEvent(GlobalConfig::Instance().GetWorkerSchedGroupId(), UdfEvent::kEventIdFlowFuncExecute,
static_cast<uint32_t>(flow_func_processor_idx));
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Failed to submit flow func execute event. flow_func_processor_idx=%zu.", flow_func_processor_idx);
return ret;
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowFuncExecutor::SubmitEvent(uint32_t group_id, uint32_t event_id, uint32_t sub_event_id) {
UDF_LOG_DEBUG("submit event, group_id=%u, event_id=%u, sub_event_id=%u.", group_id, event_id, sub_event_id);
event_summary event_info_summary = {};
event_info_summary.pid = getpid();
event_info_summary.event_id = static_cast<EVENT_ID>(event_id);
event_info_summary.subevent_id = sub_event_id;
event_info_summary.msg = nullptr;
event_info_summary.msg_len = 0U;
event_info_summary.dst_engine = GlobalConfig::Instance().IsRunOnAiCpu() ? ACPU_LOCAL : CCPU_LOCAL;
event_info_summary.grp_id = group_id;
drvError_t ret = halEschedSubmitEvent(GlobalConfig::Instance().GetDeviceId(), &event_info_summary);
if (ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("Failed to submit event, drv_ret=%d, group_id=%u, event_id=%u, sub_event_id=%u.",
static_cast<int32_t>(ret), group_id, event_id, sub_event_id);
return FLOW_FUNC_ERR_DRV_ERROR;
}
return FLOW_FUNC_SUCCESS;
}
void FlowFuncExecutor::Stop(bool recv_term_signal) {
running_ = false;
GlobalConfig::Instance().SetExitFlag(true);
if (recv_term_signal) {
recv_term_signal_ = true;
}
}
void FlowFuncExecutor::WaitForStop() {
udf_thread_pool_.WaitForStop();
FlowFuncLoggerManager::Instance().Finalize();
if (statistic_timer_handle_ != nullptr) {
(void)FlowFuncTimer::Instance().StopTimer(statistic_timer_handle_);
(void)FlowFuncTimer::Instance().DeleteTimer(statistic_timer_handle_);
statistic_timer_handle_ = nullptr;
}
if (monitor_term_signal_timer_handle_ != nullptr) {
(void)FlowFuncTimer::Instance().StopTimer(monitor_term_signal_timer_handle_);
(void)FlowFuncTimer::Instance().DeleteTimer(monitor_term_signal_timer_handle_);
monitor_term_signal_timer_handle_ = nullptr;
}
if (monitor_parent_timer_handle_ != nullptr) {
(void)FlowFuncTimer::Instance().StopTimer(monitor_parent_timer_handle_);
(void)FlowFuncTimer::Instance().DeleteTimer(monitor_parent_timer_handle_);
monitor_parent_timer_handle_ = nullptr;
}
FlowFuncTimer::Instance().Finalize();
DumpMetrics(true);
MemoryStatisticManager::Instance().Finalize();
}
int32_t FlowFuncExecutor::SubscribeInvokeModelEvent(uint32_t thread_idx) const {
uint32_t device_id = GlobalConfig::Instance().GetDeviceId();
uint32_t invoke_model_sched_group_id = GlobalConfig::Instance().GetInvokeModelSchedGroupId();
drvError_t ret = halEschedSubscribeEvent(device_id, invoke_model_sched_group_id, thread_idx,
kInvokeModelWaitEventMask);
if (ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR(
"halEschedSubscribeEvent failed, device_id[%u], group_id[%u], thread_idx[%u] eventBitmap[%lu].",
device_id, invoke_model_sched_group_id, thread_idx, kInvokeModelWaitEventMask);
return FLOW_FUNC_ERR_DRV_ERROR;
}
struct event_info event = {};
ret = halEschedWaitEvent(device_id, invoke_model_sched_group_id, thread_idx, 0, &event);
if (ret != DRV_ERROR_NO_EVENT) {
UDF_LOG_ERROR(
"halEschedWaitEvent ret=%d must be no event=%d, device_id[%u], group_id[%u], thread_idx[%u] eventBitmap[%lu].",
static_cast<int32_t>(ret), static_cast<int32_t>(DRV_ERROR_NO_EVENT), device_id, invoke_model_sched_group_id,
thread_idx,
kInvokeModelWaitEventMask);
return FLOW_FUNC_ERR_DRV_ERROR;
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowFuncExecutor::SubscribeFlowMsgQueueEvent(uint32_t thread_idx) const {
uint32_t device_id = GlobalConfig::Instance().GetDeviceId();
uint32_t flow_msg_queue_sched_group_id = GlobalConfig::Instance().GetFlowMsgQueueSchedGroupId();
drvError_t ret = halEschedSubscribeEvent(device_id, flow_msg_queue_sched_group_id, thread_idx,
kFlowMsgQueueWaitEventMask);
if (ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR(
"halEschedSubscribeEvent failed, device_id[%u], group_id[%u], thread_idx[%u] eventBitmap[%lu].",
device_id, flow_msg_queue_sched_group_id, thread_idx, kFlowMsgQueueWaitEventMask);
return FLOW_FUNC_ERR_DRV_ERROR;
}
struct event_info event = {};
ret = halEschedWaitEvent(device_id, flow_msg_queue_sched_group_id, thread_idx, 0, &event);
if (ret != DRV_ERROR_NO_EVENT) {
UDF_LOG_ERROR(
"halEschedWaitEvent ret=%d must be no event=%d, device_id[%u], group_id[%u], thread_idx[%u] eventBitmap[%lu].",
static_cast<int32_t>(ret), static_cast<int32_t>(DRV_ERROR_NO_EVENT), device_id, flow_msg_queue_sched_group_id,
thread_idx,
kFlowMsgQueueWaitEventMask);
return FLOW_FUNC_ERR_DRV_ERROR;
}
return FLOW_FUNC_SUCCESS;
}
void FlowFuncExecutor::ThreadLoop(uint32_t thread_idx) {
GlobalConfig::Instance().SetCurrentSchedThreadIdx(thread_idx);
if (!GlobalConfig::Instance().IsLimitRunBuiltinUdf()) {
int32_t thread_ret = FlowFuncThreadPool::ThreadSecureCompute();
if (thread_ret != FLOW_FUNC_SUCCESS) {
Stop();
udf_thread_pool_.ThreadAbnormal(thread_idx);
UDF_LOG_ERROR("ThreadSecureCompute failed, thread_idx[%u].", thread_idx);
return;
}
}
int32_t ret = SubscribeInvokeModelEvent(thread_idx);
if (ret != FLOW_FUNC_SUCCESS) {
Stop();
udf_thread_pool_.ThreadAbnormal(thread_idx);
UDF_LOG_ERROR("SubscribeInvokeModelEvent failed, thread_idx[%u].", thread_idx);
return;
}
ret = SubscribeFlowMsgQueueEvent(thread_idx);
if (ret != FLOW_FUNC_SUCCESS) {
Stop();
udf_thread_pool_.ThreadAbnormal(thread_idx);
UDF_LOG_ERROR("SubscribeFlowMsgQueueEvent failed, thread_idx[%u].", thread_idx);
return;
}
uint32_t device_id = GlobalConfig::Instance().GetDeviceId();
uint32_t main_sched_group_id = GlobalConfig::Instance().GetMainSchedGroupId();
uint32_t worker_sched_group_id = GlobalConfig::Instance().GetWorkerSchedGroupId();
uint32_t sched_group_id = (thread_idx == (cpu_num_ - 1) ? main_sched_group_id : worker_sched_group_id);
uint64_t wait_event_mask = (thread_idx == (cpu_num_ - 1) ? kWaitEventMask : kWorkerWaitEventMask);
if (GlobalConfig::Instance().IsOnDevice()) {
wait_event_mask = (kWaitEventMask | kWorkerWaitEventMask);
}
drvError_t drv_ret = halEschedSubscribeEvent(device_id, sched_group_id, thread_idx, wait_event_mask);
if (drv_ret != DRV_ERROR_NONE) {
Stop();
udf_thread_pool_.ThreadAbnormal(thread_idx);
UDF_LOG_ERROR("halEschedSubscribeEvent failed, device_id[%u], group_id[%u], thread_idx[%u] eventBitmap[%lu].",
device_id, sched_group_id, thread_idx, wait_event_mask);
return;
}
UDF_RUN_LOG_INFO("thread[%u] subscribe event end, sched_group_id=%u, eventBitmap=%lu.",
thread_idx, sched_group_id, wait_event_mask);
udf_thread_pool_.ThreadReady(thread_idx);
struct event_info event = {};
constexpr int32_t kWaitTimeout = 2000;
uint32_t timeout_times = 0U;
while (running_) {
drvError_t sched_ret = halEschedWaitEvent(device_id, sched_group_id, thread_idx, kWaitTimeout, &event);
if (sched_ret == DRV_ERROR_NONE) {
GlobalConfig::Instance().SetCurrentSchedGroupId(sched_group_id);
ProcessEvent(event, thread_idx);
timeout_times = 0U;
} else if (sched_ret == DRV_ERROR_SCHED_WAIT_TIMEOUT) {
if ((timeout_times % 10U) == 0U) {
UDF_LOG_DEBUG("wait event timeout,thread index=%u, continuous timeout times=%u.",
thread_idx, timeout_times);
}
++timeout_times;
if (thread_idx == (cpu_num_ - 1)) {
CheckReplenishSchedule();
}
} else {
UDF_LOG_ERROR("wait event failed, device_id=%u, threadIndex=%u, group_id=%u, sched_ret=%d.",
device_id, thread_idx, sched_group_id, static_cast<int32_t>(sched_ret));
}
}
UDF_RUN_LOG_INFO("flow func thread[%u] exit.", thread_idx);
}
void FlowFuncExecutor::ProcessEvent(const struct event_info &event, uint32_t thread_idx) {
auto event_id = static_cast<uint32_t>(event.comm.event_id);
const auto proc_func_iter = event_proc_func_map_.find(event_id);
if (proc_func_iter == event_proc_func_map_.cend()) {
UDF_LOG_ERROR("no proc func found, event_id=%u, thread_idx=%u", event_id, thread_idx);
return;
}
auto event_proc_func = proc_func_iter->second;
(this->*(event_proc_func))(event, thread_idx);
}
void FlowFuncExecutor::ProcessProcessorInitEvent(const struct event_info &event, uint32_t thread_idx) {
(void)event;
uint32_t device_id = GlobalConfig::Instance().GetDeviceId();
int32_t ret;
UDF_RUN_LOG_INFO("process processor init event start, thread_idx=%u.", thread_idx);
for (auto &flow_funcs_processor : func_processors_) {
ret = flow_funcs_processor->Init(device_id);
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("flow_func_processor init failed, flowFuncInfo=%s, ret=%d.",
flow_funcs_processor->GetFlowFuncInfo().c_str(),
ret);
Stop();
return;
}
}
UDF_RUN_LOG_INFO("start to subscribe output queues.");
ret = SubscribeOutputQueue();
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("SubscribeOutputQueue failed, ret=%d", ret);
Stop();
return;
}
UDF_RUN_LOG_INFO("start to subscribe input queues.");
ret = SubscribeInputQueue();
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("SubscribeInputQueue failed, ret=%d", ret);
Stop();
return;
}
UDF_RUN_LOG_INFO("start to subscribe status output queues.");
ret = SubscribeStatusOutputQueue();
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("SubscribeStatusOutputQueue failed, ret=%d", ret);
Stop();
return;
}
ret = ScheduleFlowFuncInit();
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("schedule flow func init failed, ret=%d", ret);
Stop();
return;
}
UDF_RUN_LOG_INFO("Process processor init event end.");
}
int32_t FlowFuncExecutor::ScheduleFlowFuncInit() const {
UDF_RUN_LOG_INFO("start to submit init FlowFunc event.");
int32_t ret = SubmitEvent(GlobalConfig::Instance().GetMainSchedGroupId(), UdfEvent::kEventIdFlowFuncInit, 0);
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Failed to submit flow func init event.");
return ret;
}
return FLOW_FUNC_SUCCESS;
}
void FlowFuncExecutor::ProcessSingleFlowFuncInitEvent(const struct event_info &event, uint32_t thread_idx) {
auto flow_func_processor_idx = static_cast<size_t>(event.comm.subevent_id);
UDF_LOG_DEBUG("Flow func single flow func init event begin, flow_func_processor_idx=%zu, thread_idx=%u.",
flow_func_processor_idx, thread_idx);
if (flow_func_processor_idx >= func_processors_.size()) {
UDF_LOG_ERROR("FlowFuncExecute event invalid, flow_func_processor_idx=%zu.", flow_func_processor_idx);
return;
}
auto flow_func_processor = func_processors_[flow_func_processor_idx];
int32_t ret = flow_func_processor->InitFlowFunc();
if (ret == FLOW_FUNC_ERR_INIT_AGAIN) {
ret = SubmitEvent(GlobalConfig::Instance().GetMainSchedGroupId(),
UdfEvent::kEventIdSingleFlowFuncInit, event.comm.subevent_id);
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Failed to submit single flow func init event, flow_func_processor_idx=%zu",
flow_func_processor_idx);
}
return;
}
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("flow_func_processor init flow func failed, flowFuncInfo=%s, ret=%d",
flow_func_processor->GetFlowFuncInfo().c_str(),
ret);
Stop();
return;
}
UDF_RUN_LOG_INFO("Start to schedule FlowFunc.");
ret = ScheduleFlowFunc(flow_func_processor_idx);
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("ScheduleFlowFunc failed, ret=%d, idx=%zu, flowFuncInfo=%s.",
ret,
flow_func_processor_idx,
func_processors_[flow_func_processor_idx]->GetFlowFuncInfo().c_str());
Stop();
return;
}
UDF_RUN_LOG_INFO("Single flow func init event process end");
}
void FlowFuncExecutor::ProcessFlowFuncInitEvent(const struct event_info &event, uint32_t thread_idx) {
(void)event;
UDF_RUN_LOG_INFO("FlowFunc init event start, thread_idx=%u.", thread_idx);
uint32_t need_re_init_num = 0;
for (auto &flow_funcs_processor : func_processors_) {
int32_t ret = flow_funcs_processor->InitFlowFunc();
if (ret == FLOW_FUNC_ERR_INIT_AGAIN) {
++need_re_init_num;
continue;
}
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("flow_func_processor init flow func failed, flowFuncInfo=%s, ret=%d",
flow_funcs_processor->GetFlowFuncInfo().c_str(),
ret);
Stop();
return;
}
}
if (need_re_init_num > 0) {
UDF_LOG_INFO("flow func need re init, need_re_init_num=%u.", need_re_init_num);
int32_t ret = ScheduleFlowFuncInit();
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("schedule flow func init failed, ret=%d", ret);
Stop();
return;
}
return;
}
UDF_RUN_LOG_INFO("Start to schedule FlowFunc.");
for (size_t idx = 0UL; idx < func_processors_.size(); ++idx) {
int32_t ret = ScheduleFlowFunc(idx);
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("ScheduleFlowFunc failed, ret=%d, idx=%zu, flowFuncInfo=%s.",
ret,
idx,
func_processors_[idx]->GetFlowFuncInfo().c_str());
Stop();
return;
}
}
ControlMessageType msg_type = ControlMessageType::kUnknow;
UDF_RUN_LOG_INFO("request_queue_wrapper_ %d.", request_queue_wrapper_ == nullptr);
if ((request_queue_wrapper_ != nullptr) && (ProcessRequestMessageQueue(msg_type) != FLOW_FUNC_SUCCESS)) {
SendMessageByResponseQueue(msg_type, FLOW_FUNC_FAILED);
UDF_LOG_ERROR("Try process request message queue data during init procedure failed.");
Stop();
return;
}
UDF_RUN_LOG_INFO("FlowFunc init event process end");
}
int32_t FlowFuncExecutor::SubscribeInputQueue() const {
QueueSetInputPara input_param;
QueueSetInput input;
for (const auto &dev_input : dev_input_queue_map_) {
uint32_t queue_device_id = dev_input.first;
for (const auto &queue : dev_input.second) {
uint32_t queue_id = queue.first;
bool is_proxy_queue = queue.second;
auto drv_ret = halQueueAttach(queue_device_id, queue_id, kAttachWaitTimeout);
if (drv_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("attached input queue[%u] failed, ret[%d], queue_device_id[%u]", queue_id,
static_cast<int32_t>(drv_ret), queue_device_id);
return FLOW_FUNC_ERR_QUEUE_ERROR;
}
if (!is_proxy_queue) {
input.queSetWorkMode.qid = queue_id;
input.queSetWorkMode.workMode = static_cast<uint32_t>(QUEUE_MODE_PULL);
input_param.inBuff = static_cast<void *>(&input);
input_param.inLen = static_cast<uint32_t>(sizeof(QueueSetInput));
(void)halQueueSet(queue_device_id, QUEUE_SET_WORK_MODE, &input_param);
if (flow_msg_queues_.count(queue_id) > 0U) {
continue;
}
drv_ret = halQueueSubscribe(queue_device_id, queue_id, GlobalConfig::Instance().GetMainSchedGroupId(),
static_cast<int32_t>(QUEUE_TYPE_SINGLE));
if (drv_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("Failed to subscribe event for queue[%u], ret[%d].", queue_id,
static_cast<int32_t>(drv_ret));
return FLOW_FUNC_ERR_QUEUE_ERROR;
}
UDF_LOG_INFO("subscribe input queue end, queue_id=%u.", queue_id);
}
}
}
return FLOW_FUNC_SUCCESS;
}
void FlowFuncExecutor::UnsubscribeInputQueue() const {
for (const auto &dev_input : dev_input_queue_map_) {
uint32_t queue_device_id = dev_input.first;
for (const auto &queue : dev_input.second) {
uint32_t queue_id = queue.first;
if (flow_msg_queues_.count(queue_id) > 0U) {
continue;
}
bool is_proxy_queue = queue.second;
if (!is_proxy_queue) {
(void)halQueueUnsubscribe(queue_device_id, queue_id);
UDF_LOG_INFO("unsubscribe input queue end, queue_id=%u.", queue_id);
}
}
}
}
void FlowFuncExecutor::UnsubscribeOutputQueue() const {
for (const auto &dev_output : dev_output_queue_map_) {
uint32_t queue_device_id = dev_output.first;
for (const auto &queue : dev_output.second) {
uint32_t queue_id = queue.first;
bool is_proxy_queue = queue.second;
if (!is_proxy_queue) {
(void)halQueueUnsubF2NFEvent(queue_device_id, queue_id);
UDF_LOG_INFO("unsubscribe output queue end, queue_id=%u.", queue_id);
}
}
}
}
int32_t FlowFuncExecutor::SubscribeOutputQueue() const {
for (const auto &dev_output : dev_output_queue_map_) {
uint32_t queue_device_id = dev_output.first;
for (const auto &queue : dev_output.second) {
uint32_t queue_id = queue.first;
bool is_proxy_queue = queue.second;
auto drv_ret = halQueueAttach(queue_device_id, queue_id, kAttachWaitTimeout);
if (drv_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("attached output queue[%u] failed, ret[%d], queue_device_id[%u]", queue_id,
static_cast<int32_t>(drv_ret), queue_device_id);
return FLOW_FUNC_ERR_QUEUE_ERROR;
}
if (!is_proxy_queue) {
drv_ret = halQueueSubF2NFEvent(queue_device_id, queue_id, GlobalConfig::Instance().GetMainSchedGroupId());
if (drv_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("Failed to subscribe F2NF event for queue[%u], ret[%d].", queue_id,
static_cast<int32_t>(drv_ret));
return FLOW_FUNC_ERR_QUEUE_ERROR;
}
UDF_LOG_INFO("subscribe out queue F2NF end, queue_id=%u.", queue_id);
}
}
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowFuncExecutor::SubscribeStatusOutputQueue() const {
for (const auto &status_output_queue : status_output_queue_map_) {
const int32_t queue_device_id = status_output_queue.first;
for (const auto queue_id : status_output_queue.second) {
const auto drv_ret = halQueueAttach(queue_device_id, queue_id, kAttachWaitTimeout);
if (drv_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("attached status output queue[%u] failed, ret[%d], queue_device_id[%d]",
queue_id, static_cast<int32_t>(drv_ret), queue_device_id);
return FLOW_FUNC_ERR_QUEUE_ERROR;
}
UDF_LOG_INFO("attached status output queue[%u] success.", queue_id);
}
}
return FLOW_FUNC_SUCCESS;
}
void FlowFuncExecutor::ProcessFlowFuncExecuteEvent(const struct event_info &event, uint32_t thread_idx) {
auto flow_func_processor_idx = static_cast<size_t>(event.comm.subevent_id);
UDF_LOG_DEBUG(
"Flow func execute event begin, flow_func_processor_idx=%zu, thread_idx=%u.", flow_func_processor_idx, thread_idx);
if (flow_func_processor_idx >= func_processors_.size()) {
UDF_LOG_ERROR("FlowFuncExecute event invalid, flow_func_processor_idx=%zu.", flow_func_processor_idx);
return;
}
auto flow_func_processor = func_processors_[flow_func_processor_idx];
bool is_need_sched = flow_func_processor->Schedule(thread_idx);
DoScheduleFlowFunc(is_need_sched, flow_func_processor_idx);
UDF_LOG_DEBUG("Flow func execute event end.");
}
void FlowFuncExecutor::DoScheduleFlowFunc(bool is_need_sched, uint32_t flow_func_processor_idx) {
if (is_need_sched) {
(void)ScheduleFlowFunc(flow_func_processor_idx);
} else {
auto flow_func_processor = func_processors_[flow_func_processor_idx];
if (!(flow_func_processor->IsOk())) {
UDF_LOG_ERROR("Flow func executor will exit as flow_func_processor schedule failed, flowFuncInfo=%s.",
flow_func_processor->GetFlowFuncInfo().c_str());
Stop();
}
}
}
void FlowFuncExecutor::ProcessEmptyToNotEmptyEvent(const struct event_info &event, uint32_t thread_idx) {
const uint32_t queue_id = event.comm.subevent_id;
UDF_LOG_DEBUG("EmptyToNotEmptyEvent, queue_id=%u, thread_idx=%u.", queue_id, thread_idx);
if (queue_id == GlobalConfig::Instance().GetReqQueueId()) {
ControlMessageType msg_type = ControlMessageType::kUnknow;
const auto ret = ProcessRequestMessageQueue(msg_type);
if (ret != FLOW_FUNC_SUCCESS) {
SendMessageByResponseQueue(msg_type, ret);
Stop();
UDF_LOG_ERROR("Process request queue = %u message failed. Start to exit.", queue_id);
}
return;
}
std::map<uint32_t, size_t>::const_iterator iter = input_to_flow_func_processor_idx_.find(queue_id);
if (iter == input_to_flow_func_processor_idx_.cend()) {
UDF_LOG_WARN("skip processing E2NE event for input queue[%u].", queue_id);
return;
}
auto flow_func_idx = iter->second;
if (flow_func_idx >= func_processors_.size()) {
UDF_LOG_ERROR("flow_func_idx is invalid, queue_id=%u, flow_func_idx=%zu, funcProcessors size=%zu",
queue_id,
flow_func_idx,
func_processors_.size());
return;
}
auto flow_func_processor = func_processors_[flow_func_idx];
bool is_need_sched = flow_func_processor->EmptyToNotEmpty();
if (is_need_sched) {
(void)ScheduleFlowFunc(flow_func_idx);
}
}
int32_t FlowFuncExecutor::DequeueAndParseRequestMessage(ff::deployer::ExecutorRequest &req_msg,
RequestMsgType &msg_type) const {
Mbuf *control_muff = nullptr;
auto wrapper_ret = request_queue_wrapper_->Dequeue(control_muff);
if (wrapper_ret == HICAID_ERR_QUEUE_EMPTY) {
UDF_LOG_INFO("Message queue turns to empty status.");
return FLOW_FUNC_ERR_QUEUE_EMPTY;
}
if ((wrapper_ret != HICAID_SUCCESS)) {
UDF_LOG_ERROR("Dequeue message in control queue failed, ret = %d.", wrapper_ret);
return FLOW_FUNC_FAILED;
}
auto mbuf_deleter = [control_muff]() { (void)halMbufFree(control_muff); };
ScopeGuard mbuf_guard(mbuf_deleter);
return ParseRequestMessage(control_muff, req_msg, msg_type);
}
int32_t FlowFuncExecutor::ParseRequestMessage(Mbuf *control_mbuf, ff::deployer::ExecutorRequest &req_msg,
RequestMsgType &msg_type) const {
void *data_ptr = nullptr;
auto drv_ret = halMbufGetBuffAddr(control_mbuf, &data_ptr);
if ((drv_ret != DRV_ERROR_NONE) || (data_ptr == nullptr)) {
UDF_LOG_ERROR("Failed to get data or data is nullptr, ret[%d].", drv_ret);
return FLOW_FUNC_ERR_MEM_BUF_ERROR;
}
uint64_t data_len = 0UL;
drv_ret = halMbufGetDataLen(control_mbuf, &data_len);
if ((drv_ret != DRV_ERROR_NONE) || (data_len == 0UL)) {
UDF_LOG_ERROR("Failed to get data or data length is 0, ret[%d].", drv_ret);
return FLOW_FUNC_ERR_MEM_BUF_ERROR;
}
google::protobuf::io::ArrayInputStream stream(data_ptr, static_cast<int32_t>(data_len));
const auto parse_ret = req_msg.ParseFromZeroCopyStream(&stream);
if (!parse_ret) {
UDF_LOG_ERROR("Parse control message failed. data_len=%lu.", data_len);
return FLOW_FUNC_FAILED;
}
if (req_msg.has_clear_model_message()) {
msg_type = RequestMsgType::kControlMsg;
UDF_LOG_DEBUG("Current message is control message.");
} else if (req_msg.has_exception_request()) {
msg_type = RequestMsgType::kExceptionMsg;
UDF_LOG_DEBUG("Current message is exception message.");
} else if (req_msg.type() == ff::deployer::ExecutorRequestType::kNotify) {
msg_type = RequestMsgType::kNotify;
UDF_LOG_DEBUG("Current message is notify message.");
} else {
UDF_LOG_ERROR("Request msg queue should only send control message, exception info and notify message.");
return FLOW_FUNC_ERR_PARAM_INVALID;
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowFuncExecutor::ProcessControlMsg(ff::deployer::ExecutorRequest_ClearModelRequest &ctrl_msg) {
if (ctrl_msg.clear_msg_type() == static_cast<int32_t>(ControlMessageType::kSuspend)) {
GlobalConfig::Instance().SetAbnormalStatus(true);
UDF_LOG_INFO("Executor will send suspend for processor num[%zu].", func_processors_.size());
for (size_t i = 0UL; i < func_processors_.size(); ++i) {
func_processors_[i]->SetClearAndSuspend();
std::unique_lock<std::mutex> lk(suspend_mutex_);
(void)suspend_process_ids_.insert(static_cast<uint32_t>(i));
auto ret = ScheduleFlowFunc(i);
if (ret != FLOW_FUNC_SUCCESS) {
return ret;
}
}
} else if (ctrl_msg.clear_msg_type() == static_cast<int32_t>(ControlMessageType::kRecover)) {
GlobalConfig::Instance().SetAbnormalStatus(true);
UDF_LOG_INFO("Executor will send recover for processor num[%zu].", func_processors_.size());
for (size_t i = 0UL; i < func_processors_.size(); ++i) {
func_processors_[i]->SetClearAndRecover();
std::unique_lock<std::mutex> lk(recover_mutex_);
(void)recover_process_ids_.insert(static_cast<uint32_t>(i));
auto ret = ScheduleFlowFunc(i);
if (ret != FLOW_FUNC_SUCCESS) {
return ret;
}
}
} else {
UDF_LOG_ERROR("Invalid message type got from control queue. msg type :%d", ctrl_msg.clear_msg_type());
return FLOW_FUNC_ERR_PARAM_INVALID;
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowFuncExecutor::ProcessExceptionMsg(const ff::deployer::ExecutorRequest_DataflowExceptionNotify &exp_msg) {
UDF_LOG_INFO("Executor will send exception info for processor num[%zu].", func_processors_.size());
if (exp_msg.type() == static_cast<uint32_t>(ExceptionType::kAddException)) {
UdfExceptionInfo exp_info = {};
exp_info.exp_code = exp_msg.exception_code();
exp_info.trans_id = exp_msg.trans_id();
exp_info.user_context_id = exp_msg.user_context_id();
auto ret = memcpy_s(exp_info.exp_context, kMaxMbufHeadLen,
exp_msg.exception_context().data(), exp_msg.exception_context().size());
if (ret != EOK) {
UDF_LOG_ERROR("Copy exception info from message failed. ret=%d, trans_id[%lu], exp_code[%d],"
"user_context_id[%lu]", static_cast<int32_t>(ret), exp_info.trans_id, exp_info.exp_code,
exp_info.user_context_id);
return FLOW_FUNC_FAILED;
}
exp_info.exp_context_size = exp_msg.exception_context().size();
for (size_t i = 0UL; i < func_params_.size(); ++i) {
if (func_params_[i]->HandleInvokedException(exp_msg.scope(), exp_msg.trans_id(), true)) {
continue;
}
}
for (size_t i = 0UL; i < func_processors_.size(); ++i) {
if (!(func_processors_[i]->CheckSameScope(exp_msg.scope()))) {
return FLOW_FUNC_SUCCESS;
}
func_processors_[i]->RecordExceptionInfo(exp_info);
ret = ScheduleFlowFunc(i);
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Schedule func failed for trans_id[%lu], exp_code[%d], user_context_id[%lu]",
exp_info.trans_id, exp_info.exp_code, exp_info.user_context_id);
return ret;
}
}
} else if (exp_msg.type() == static_cast<uint32_t>(ExceptionType::kDeleteException)) {
for (size_t i = 0UL; i < func_params_.size(); ++i) {
if (func_params_[i]->HandleInvokedException(exp_msg.scope(), exp_msg.trans_id(), false)) {
continue;
}
}
for (size_t i = 0UL; i < func_processors_.size(); ++i) {
if (!(func_processors_[i]->CheckSameScope(exp_msg.scope()))) {
return FLOW_FUNC_SUCCESS;
}
func_processors_[i]->RecordDeleteException(exp_msg.trans_id());
}
} else {
UDF_LOG_ERROR("Invalid Type[%u] Get from exception request.", exp_msg.type());
return FLOW_FUNC_FAILED;
}
return FLOW_FUNC_SUCCESS;
}
int32_t FlowFuncExecutor::ProcessRequestMessageQueue(ControlMessageType &ctrl_msg_type) {
int32_t deq_ret = FLOW_FUNC_ERR_QUEUE_EMPTY;
bool ctrl_msg_proc = false;
do {
ff::deployer::ExecutorRequest req_msg;
RequestMsgType msg_type;
deq_ret = DequeueAndParseRequestMessage(req_msg, msg_type);
if (deq_ret == FLOW_FUNC_ERR_QUEUE_EMPTY) {
UDF_LOG_DEBUG("Now message queue is empty.");
break;
}
if (deq_ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Parse request message failed.");
return deq_ret;
}
if (msg_type == RequestMsgType::kExceptionMsg) {
ctrl_msg_type = ControlMessageType::kException;
auto exp_msg = req_msg.exception_request().exception_notify();
if (ProcessExceptionMsg(exp_msg) != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Process exception msg failed.");
return FLOW_FUNC_FAILED;
}
if (SendMessageByResponseQueue(ControlMessageType::kException, FLOW_FUNC_SUCCESS) != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Send response message in queue failed.");
return FLOW_FUNC_FAILED;
}
} else {
if (ctrl_msg_proc) {
continue;
}
ctrl_msg_proc = true;
ff::deployer::ExecutorRequest_ClearModelRequest ctrl_msg = req_msg.clear_model_message();
ctrl_msg_type = static_cast<ControlMessageType>(ctrl_msg.clear_msg_type());
if (ProcessControlMsg(ctrl_msg) != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Process control msg failed.");
return FLOW_FUNC_FAILED;
}
}
} while (deq_ret != FLOW_FUNC_ERR_QUEUE_EMPTY);
return FLOW_FUNC_SUCCESS;
}
template<typename T>
int32_t FlowFuncExecutor::SerializeProtoToMbuf(const T &proto_msg, Mbuf *&mbuf_to_generate) {
const size_t rsp_size = proto_msg.ByteSizeLong();
const FillFunc fill_func = [&proto_msg](void *const buffer, const size_t size) {
if (proto_msg.SerializeToArray(buffer, static_cast<int32_t>(size))) {
return FLOW_FUNC_SUCCESS;
}
UDF_LOG_ERROR("Protobuf serializeToArray failed.");
return FLOW_FUNC_FAILED;
};
Mbuf *mbuf = nullptr;
int32_t ret = GenerateMbuf(rsp_size, fill_func, mbuf);
if ((ret != FLOW_FUNC_SUCCESS) || (mbuf == nullptr)) {
UDF_LOG_ERROR("generate mbuf failed, ret[%d].", ret);
return FLOW_FUNC_ERR_MEM_BUF_ERROR;
}
mbuf_to_generate = mbuf;
return FLOW_FUNC_SUCCESS;
}
int32_t FlowFuncExecutor::SendMessageByResponseQueue(const ControlMessageType &msg_type, const int32_t result) {
if (GlobalConfig::Instance().GetRspQueueId() == UINT32_MAX) {
UDF_LOG_INFO("There is not message queue in current version. skip to send message.");
return FLOW_FUNC_SUCCESS;
}
std::string msg;
const auto iter = kResponseMsgMap.find(msg_type);
if (iter != kResponseMsgMap.cend()) {
msg = iter->second;
} else {
msg = "Unknown operator " + std::to_string(static_cast<int32_t>(msg_type));
}
ff::deployer::ExecutorResponse response;
response.set_error_code(result);
if (result == FLOW_FUNC_SUCCESS) {
msg += " success.";
} else {
msg += " failed.";
}
response.set_error_message(msg);
Mbuf *mbuf = nullptr;
if (SerializeProtoToMbuf<ff::deployer::ExecutorResponse>(response, mbuf) != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Serialize proto to mbuf failed.");
return FLOW_FUNC_FAILED;
}
UDF_LOG_INFO("Prepare to send response message.code[%d], msg[%s].", result, msg.c_str());
const auto ret = response_queue_wrapper_->Enqueue(mbuf);
if (ret != HICAID_SUCCESS) {
UDF_LOG_ERROR("Enqueue message buffer to response queue failed. queue_id = %u, ret = %d.",
GlobalConfig::Instance().GetRspQueueId(), ret);
(void)halMbufFree(mbuf);
return FLOW_FUNC_ERR_MEM_BUF_ERROR;
}
return FLOW_FUNC_SUCCESS;
}
void FlowFuncExecutor::ProcessFullToNoFullEvent(const struct event_info &event, uint32_t thread_idx) {
const uint32_t queue_id = event.comm.subevent_id;
UDF_LOG_DEBUG("FullToNoFullEvent, queue_id=%u, thread_idx=%u.", queue_id, thread_idx);
std::map<uint32_t, std::vector<size_t>>::const_iterator context_iter = output_to_flow_func_processor_idx_.find(queue_id);
if (context_iter == output_to_flow_func_processor_idx_.cend()) {
UDF_LOG_WARN("there is no flow func use output queue_id=%u", queue_id);
return;
}
for (auto flow_func_idx : context_iter->second) {
if (flow_func_idx >= func_processors_.size()) {
UDF_LOG_ERROR("flow_func_idx is invalid, queue_id=%u, flow_func_idx=%zu, flow_func_processor size=%zu",
queue_id,
flow_func_idx,
func_processors_.size());
return;
}
auto flow_func_processor = func_processors_[flow_func_idx];
bool is_need_sched = flow_func_processor->FullToNotFull();
if (is_need_sched) {
(void)ScheduleFlowFunc(flow_func_idx);
}
}
}
void FlowFuncExecutor::ProcessTimerEvent(const struct event_info &event, uint32_t thread_idx) {
uint32_t timer_id = event.comm.subevent_id;
UDF_LOG_DEBUG("ProcessTimerEvent enter, timer_id=%u, thread_idx=%u.", timer_id, thread_idx);
FlowFuncTimer::Instance().ExecCallBack(timer_id);
}
void FlowFuncExecutor::ProcessNotifyThreadExitEvent(const struct event_info &event, uint32_t thread_idx) {
(void)event;
UDF_LOG_INFO("Thread[%u] receive notify thread exit event.", thread_idx);
}
void FlowFuncExecutor::ConstructException(const std::string ¤t_scope,
const UdfExceptionInfo &exception_info,
ff::deployer::SubmodelStatus &exception_msg) {
exception_msg.set_msg_type(static_cast<uint32_t>(StatusQueueMsgType::kRaiseExceptionMsgType));
auto exception_proto = exception_msg.mutable_exception_info();
exception_proto->set_trans_id(exception_info.trans_id);
exception_proto->set_exception_code(exception_info.exp_code);
exception_proto->set_scope(current_scope);
exception_proto->set_user_context_id(exception_info.user_context_id);
exception_proto->set_exception_context(&exception_info.exp_context[0], exception_info.exp_context_size);
UDF_LOG_INFO("Construct exception message: trans_id[%lu] exp_code[%d] scope[%s] success.",
exception_info.trans_id, exception_info.exp_code, current_scope.c_str());
}
void FlowFuncExecutor::ProcessRaiseExceptionEvent(const struct event_info &event, uint32_t thread_idx) {
auto flow_func_processor_idx = static_cast<size_t>(event.comm.subevent_id);
UDF_LOG_DEBUG("Raise exception event begin, flow_func_processor_idx=%zu, thread_idx=%u.",
flow_func_processor_idx, thread_idx);
if (flow_func_processor_idx >= func_processors_.size()) {
UDF_LOG_ERROR("Raise exception event invalid, flow_func_processor_idx=%zu.", flow_func_processor_idx);
Stop();
return;
}
if (event.priv.msg_len != sizeof(uint64_t)) {
UDF_LOG_ERROR("Get processor event msg length invalid, flow_func_processor_idx=%zu.", flow_func_processor_idx);
Stop();
return;
}
const uint64_t trans_id = *(reinterpret_cast<const uint64_t *>(event.priv.msg));
ReportExceptionMbufGenFunc mbuf_gen_func = [this](const std::string ¤t_scope,
const UdfExceptionInfo &exception_info,
Mbuf *&mbuf_to_generate) -> int32_t {
ff::deployer::SubmodelStatus exception_msg;
ConstructException(current_scope, exception_info, exception_msg);
Mbuf *mbuf = nullptr;
if (SerializeProtoToMbuf<ff::deployer::SubmodelStatus>(exception_msg, mbuf) != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Serialize proto to mbuf failed.");
return FLOW_FUNC_FAILED;
}
mbuf_to_generate = mbuf;
return FLOW_FUNC_SUCCESS;
};
if (func_processors_[flow_func_processor_idx]->WriteStatusOutputQueue(trans_id, mbuf_gen_func) != FLOW_FUNC_SUCCESS) {
UDF_RUN_LOG_ERROR("Processor[%zu] report exception msg failed. Start to stop running.", flow_func_processor_idx);
Stop();
return;
}
}
void FlowFuncExecutor::ProcessReportStatusEvent(const struct event_info &event, uint32_t thread_idx) {
auto flow_func_processor_idx = static_cast<size_t>(event.comm.subevent_id);
UDF_LOG_DEBUG("Report status event begin, flow_func_processor_idx=%zu, thread_idx=%u.",
flow_func_processor_idx, thread_idx);
if (flow_func_processor_idx >= func_processors_.size()) {
UDF_LOG_ERROR("Report status event invalid, flow_func_processor_idx=%zu.", flow_func_processor_idx);
Stop();
return;
}
const auto ret = ReportStatus(flow_func_processor_idx);
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Report status failed, flow_func_processor_idx=%zu, thread_idx=%u.",
flow_func_processor_idx, thread_idx);
Stop();
return;
}
UDF_LOG_DEBUG("Report status event end, flow_func_processor_idx=%zu, thread_idx=%u.",
flow_func_processor_idx, thread_idx);
}
int32_t FlowFuncExecutor::ReportStatus(const size_t flow_func_processor_idx) {
ReportStatusMbufGenFunc mbuf_gen_func = [this](const std::vector<QueueDevInfo> &input_queue_infos,
const uint32_t model_uuid,
const uint32_t input_consume_sum,
Mbuf *&mbuf_to_generate) -> int32_t {
ff::deployer::SubmodelStatus sub_model_status;
sub_model_status.set_model_uuid(model_uuid);
for (const auto input_queue_info : input_queue_infos) {
const auto input_queue_id = input_queue_info.queue_id;
uint32_t depth_value = UINT32_MAX;
QueueInfo info;
const auto drv_ret = halQueueQueryInfo(input_queue_info.device_id, input_queue_id, &info);
if (drv_ret != DRV_ERROR_NONE) {
UDF_LOG_WARN("query queue info failed, queue id[%u], device id[%u], ret[%d].",
input_queue_id, input_queue_info.device_id, static_cast<int32_t>(drv_ret));
} else {
depth_value = static_cast<size_t>(info.size);
}
auto queue_status = sub_model_status.add_queue_statuses();
queue_status->set_queue_depth(depth_value);
queue_status->set_input_consume_num(input_consume_sum);
auto queue_attrs = queue_status->mutable_queue_attrs();
queue_attrs->set_queue_id(input_queue_id);
queue_attrs->set_device_type(input_queue_info.device_type);
queue_attrs->set_device_id(input_queue_info.device_id);
queue_attrs->set_logic_id(input_queue_info.logic_queue_id);
}
sub_model_status.set_msg_type(static_cast<uint32_t>(StatusQueueMsgType::kReportStatusMsgType));
if (SerializeProtoToMbuf<ff::deployer::SubmodelStatus>(sub_model_status, mbuf_to_generate) != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Serialize proto to mbuf failed.");
return FLOW_FUNC_FAILED;
}
UDF_LOG_INFO("Generate report status mbuf success, status[%s].",
sub_model_status.DebugString().c_str());
return FLOW_FUNC_SUCCESS;
};
return func_processors_[flow_func_processor_idx]->WriteStatusOutputQueue(mbuf_gen_func);
}
int32_t FlowFuncExecutor::CheckProcessorEventParams(const struct event_info &event) {
const auto flow_func_processor_idx = static_cast<size_t>(event.comm.subevent_id);
if (flow_func_processor_idx >= func_processors_.size()) {
UDF_LOG_ERROR("Report processor id invalid, flow_func_processor_idx=%zu.", flow_func_processor_idx);
return FLOW_FUNC_PROCESSOR_PARAM_ERROR;
}
if (event.priv.msg_len != sizeof(int32_t)) {
UDF_LOG_ERROR("Get processor event msg length invalid, flow_func_processor_idx=%zu.", flow_func_processor_idx);
Stop();
return FLOW_FUNC_PROCESSOR_PARAM_ERROR;
}
const int32_t ret = *(reinterpret_cast<const int32_t *>(event.priv.msg));
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Processor event result is %d, flow_func_processor_idx=%zu.", ret, flow_func_processor_idx);
return ret;
}
return FLOW_FUNC_SUCCESS;
}
void FlowFuncExecutor::ProcessReportSuspendEvent(const struct event_info &event, uint32_t thread_idx) {
const auto flow_func_processor_idx = static_cast<size_t>(event.comm.subevent_id);
auto ret = CheckProcessorEventParams(event);
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Get processor:%zu suspend finished event is invalid. result:%d", flow_func_processor_idx, ret);
SendMessageByResponseQueue(ControlMessageType::kSuspend, ret);
Stop();
return;
}
std::unique_lock<std::mutex> lk(suspend_mutex_);
(void)suspend_process_ids_.erase(flow_func_processor_idx);
if (suspend_process_ids_.empty()) {
ret = FlowFuncManager::Instance().ResetFuncState();
if (ret != FLOW_FUNC_SUCCESS) {
FlowFuncManager::Instance().Reset();
for (auto &funcProcessor : func_processors_) {
funcProcessor->ReleaseFuncWrapper();
}
}
if (SendMessageByResponseQueue(ControlMessageType::kSuspend, FLOW_FUNC_SUCCESS) != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Send response message in queue failed.");
Stop();
}
}
UDF_LOG_INFO("Process suspend finish event, flow_func_processor_idx=%zu, thread_idx=%u.",
flow_func_processor_idx, thread_idx);
}
void FlowFuncExecutor::ProcessReportRecoverEvent(const struct event_info &event, uint32_t thread_idx) {
const auto flow_func_processor_idx = static_cast<size_t>(event.comm.subevent_id);
const auto ret = CheckProcessorEventParams(event);
if (ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Get processor:%zu recover finished event is invalid. result:%d", flow_func_processor_idx, ret);
SendMessageByResponseQueue(ControlMessageType::kRecover, ret);
Stop();
return;
}
std::unique_lock<std::mutex> lk(recover_mutex_);
(void)recover_process_ids_.erase(flow_func_processor_idx);
if (recover_process_ids_.empty()) {
if (SendMessageByResponseQueue(ControlMessageType::kRecover, FLOW_FUNC_SUCCESS) != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Send response message in queue failed.");
Stop();
}
}
GlobalConfig::Instance().SetAbnormalStatus(false);
UDF_LOG_INFO("Process recover finish event end, flow_func_processor_idx=%zu, thread_idx=%u.",
flow_func_processor_idx, thread_idx);
}
void FlowFuncExecutor::ProcessSwitchSoftModeEvent(const struct event_info &event, uint32_t thread_idx) {
(void)event;
(void)thread_idx;
UDF_LOG_INFO("Udf event switch soft sched mode.");
}
int32_t FlowFuncExecutor::GenerateMbuf(const size_t req_size, const FillFunc &fill_func, Mbuf *&mbuf_to_generate) {
Mbuf *mbuf = nullptr;
auto drv_ret = halMbufAlloc(req_size, &mbuf);
if ((drv_ret != DRV_ERROR_NONE) || (mbuf == nullptr)) {
UDF_LOG_ERROR("Alloc mbuff failed, drv_ret=%d, dataSize=%lu.", drv_ret, req_size);
return FLOW_FUNC_ERR_DRV_ERROR;
}
auto mbuf_deleter = [mbuf]() { (void)halMbufFree(mbuf); };
ScopeGuard mbuf_guard(mbuf_deleter);
drv_ret = halMbufSetDataLen(mbuf, req_size);
if (drv_ret != DRV_ERROR_NONE) {
UDF_LOG_ERROR("Mbuff set data length failed, drv_ret=%d, dataSize=%lu.", drv_ret, req_size);
return FLOW_FUNC_ERR_DRV_ERROR;
}
void *buf_addr = nullptr;
drv_ret = halMbufGetBuffAddr(mbuf, &buf_addr);
if (drv_ret != DRV_ERROR_NONE || buf_addr == nullptr) {
UDF_LOG_ERROR("Failed to get buff addr, ret[%d].", drv_ret);
return FLOW_FUNC_ERR_DRV_ERROR;
}
const auto fill_ret = fill_func(buf_addr, req_size);
if (fill_ret != FLOW_FUNC_SUCCESS) {
UDF_LOG_ERROR("Failed to fill mbuf data, ret[%d].", fill_ret);
return fill_ret;
}
mbuf_guard.ReleaseGuard();
mbuf_to_generate = mbuf;
return FLOW_FUNC_SUCCESS;
}
void FlowFuncExecutor::DumpMetrics(bool with_queue_info) const {
for (const auto &funcProcessor : func_processors_) {
funcProcessor->DumpFlowFuncInfo(with_queue_info);
funcProcessor->DumpModelMetrics(with_queue_info);
}
}
}