* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "deploy/flowrm/flowgw_client.h"
#include <signal.h>
#include "mmpa/mmpa_api.h"
#include "common/utils/rts_api_utils.h"
#include "common/debug/log.h"
#include "common/subprocess/subprocess_manager.h"
#include "deploy/flowrm/tsd_client.h"
#include "deploy/deployer/deployer.h"
#include "acl/acl.h"
#include "common/df_chk.h"
namespace ge {
namespace {
const uint32_t kWaitTimeInMilliSecond = 1000U;
constexpr int32_t kForkQsWaitTimeInMilliSec = 2000;
constexpr int32_t kInnerForkQsWaitTimeInMilliSec = 5000;
constexpr int32_t kWaitTimeoutInSec = 30;
constexpr int32_t kDestroyHcomWaitTimeoutInSec = 40;
constexpr int32_t kFlowGwNotSupport = 20;
constexpr const char_t *kProtocolTypeRdma = "RDMA";
constexpr const char_t *kFlowGwProcessName = "queue_schedule";
constexpr uint32_t kBindAllDevice = 0xffffffff;
constexpr uint64_t kInvalidHcomHandle = UINT64_MAX;
}
FlowGwClient::FlowGwClient(uint32_t device_id, int32_t device_type, const std::vector<int32_t> &res_ids, bool is_proxy)
: pid_(0),
proc_status_(ProcStatus::NORMAL),
hcom_handle_(kInvalidHcomHandle),
device_id_(device_id),
device_type_(device_type),
is_proxy_(is_proxy),
is_inited_(false),
is_exception_(false),
res_ids_(res_ids) {
}
std::string FlowGwClient::FormatListParam(const std::vector<int32_t> &list) {
std::string param;
for (size_t i = 0U; i < list.size(); ++i) {
param += std::to_string(list[i]);
if (i != list.size() - 1U) {
param += ",";
}
}
return param;
}
Status FlowGwClient::InnerStartFlowGw(const ProcessParam ¶m) {
SubprocessManager::SubprocessConfig config{};
config.process_type = kFlowGwProcessName;
config.death_signal = SIGKILL;
config.args = {
kFlowGwProcessName,
std::string("--deviceId=") + std::to_string(param.device_id),
std::string("--vfId=") + std::to_string(param.vf_id),
std::string("--pid=") + std::to_string(getpid()),
std::string("--qsInitGroupName=") + param.group_name,
std::string("--schedPolicy=") + std::to_string(static_cast<uint64_t>(bqs::SchedPolicy::POLICY_SUB_BUF_EVENT)),
std::string("--starter=1"),
std::string("--resIds=") + FormatListParam(param.res_ids),
std::string("--devIds=") + FormatListParam(param.dev_ids),
};
GE_CHK_STATUS_RET(SubprocessManager::GetInstance().ForkSubprocess(config, pid_), "Failed to fork flowgw.");
GE_CHK_STATUS_RET(ge::MemoryGroupManager::GetInstance().MemGrpAddProc(param.group_name, pid_, false, true),
"Failed to add flowgw[%d] to memory group[%s].", pid_, param.group_name.c_str());
if (!param.dev_ids.empty()) {
rtBindHostpidInfo info{};
info.cpType = RT_DEVDRV_PROCESS_USER;
info.hostPid = static_cast<uint32_t>(getpid());
info.chipId = kBindAllDevice;
info.len = static_cast<uint32_t>(RT_PROCESS_SIGN_LENGTH);
const auto ret = memcpy_s(info.sign, static_cast<size_t>(RT_PROCESS_SIGN_LENGTH), &pid_, sizeof(pid_t));
GE_CHK_BOOL_RET_STATUS(ret == EOK, FAILED, "Failed to memcpy_s, ret = %d", ret);
GE_CHK_RT_RET(rtBindHostPid(info));
}
(void) mmSleep(kInnerForkQsWaitTimeInMilliSec);
std::function<void(const ProcStatus &)> excpt_handle_callback = [param, this](const ProcStatus &proc_status) {
GEEVENT("Queue schedule process status is %d.", static_cast<int32_t>(proc_status));
proc_status_ = proc_status;
};
SubprocessManager::GetInstance().RegExcptHandleCallback(pid_, excpt_handle_callback);
status_func_ = [this]() -> ProcStatus {
return proc_status_;
};
return SUCCESS;
}
int32_t FlowGwClient::KillProcess(pid_t pid, int32_t signal) {
return kill(pid, signal);
}
Status FlowGwClient::Shutdown(pid_t pid) {
int32_t status = 0;
if (KillProcess(pid, SIGTERM) == 0) {
GELOGI("kill queue schedule success, pid=%d", pid);
const pid_t ret = mmWaitPid(pid, &status, 0);
GELOGI("mmWaitPid queue schedule[%d] ret[%d] status[%d] finish", pid, ret, status);
return SUCCESS;
}
GELOGI("queue schedule[%d] stopping", pid);
uint32_t times = 0U;
while (times < 10U) {
const pid_t ret = mmWaitPid(pid, &status, M_WAIT_NOHANG);
if (ret != 0) {
GELOGI("mmWaitPid success, ret[%d] pid[%d] status[%d]", ret, pid, status);
return SUCCESS;
}
(void) mmSleep(100U);
++times;
}
GELOGI("queue schedule[%d] stopping timeout, ready to kill", pid);
if (KillProcess(pid, SIGKILL) != 0) {
const pid_t ret = mmWaitPid(pid, &status, 0);
GELOGI("mmWaitPid queue schedule[%d] ret[%d] status[%d] finish", pid, ret, status);
}
GELOGI("queue schedule[%d] stopped", pid);
return SUCCESS;
}
Status FlowGwClient::StartFlowGwFromTsd(uint32_t device_id, const std::string &group_name) {
GE_CHK_STATUS_RET(TsdClient::GetInstance().StartFlowGw(device_id, group_name, pid_),
"Failed to start flowgw, device_id = %d, group name = %s.",
device_id, group_name.c_str());
GE_CHK_STATUS_RET(MemoryGroupManager::GetInstance().RemoteMemGrpAddProc(device_id, group_name, pid_, false, true),
"Failed to add remote group, device_id = %d, group name = %s, pid = %d.",
device_id, group_name.c_str(), pid_);
(void) mmSleep(kForkQsWaitTimeInMilliSec);
status_func_ = [this]() -> ProcStatus {
ProcStatus stat = ProcStatus::NORMAL;
auto ret = TsdClient::GetInstance().GetProcStatus(device_id_, pid_, stat, kFlowGwProcessName);
if (ret == SUCCESS) {
return stat;
}
GELOGE(FAILED, "Failed to get flowgw status, device_id = %d, pid = %d.", device_id_, pid_);
return ProcStatus::EXITED;
};
return SUCCESS;
}
Status FlowGwClient::ToVisibleDeviceId(const std::vector<int32_t> &logical_device_ids,
std::vector<int32_t> &visible_device_ids) {
for (auto logical_device_id : logical_device_ids) {
int32_t visible_device_id = logical_device_id;
DF_CHK_ACL_RET(aclrtGetLogicDevIdByUserDevId(logical_device_id, &visible_device_id));
visible_device_ids.emplace_back(visible_device_id);
}
return SUCCESS;
}
Status FlowGwClient::StartFlowGw() {
if (is_proxy_) {
const auto &mem_group_name = MemoryGroupManager::GetInstance().GetRemoteMemGroupName(device_id_);
GE_CHK_STATUS_RET(StartFlowGwFromTsd(device_id_, mem_group_name),
"Failed to start flowgw from tsd, device_id = %d, memory group = %s.",
device_id_, mem_group_name.c_str());
return SUCCESS;
}
const auto &mem_group_name = MemoryGroupManager::GetInstance().GetQsMemGroupName();
const auto &node_config = Configurations::GetInstance().GetLocalNode();
ProcessParam param = {};
param.device_id = device_id_;
param.group_name = mem_group_name;
param.res_ids = res_ids_;
GE_CHK_STATUS_RET(ToVisibleDeviceId(node_config.proxy_device_ids, param.dev_ids),
"Failed to covert logical device id to visible device id");
GE_CHK_STATUS_RET(InnerStartFlowGw(param),
"Failed to start flowgw, device_id = %d, memory group = %s.",
device_id_, mem_group_name.c_str());
return SUCCESS;
}
Status FlowGwClient::Initialize() {
GE_CHK_STATUS_RET(StartFlowGw(), "Failed to start flowgw client, device_id = %d, is_proxy = %d.",
device_id_, is_proxy_);
is_inited_ = true;
GE_CHK_STATUS_RET(InitFlowGwClient(), "Failed to init flowgw client, pid = %d, device_id = %d, is_proxy = %d.",
pid_, device_id_, is_proxy_);
return SUCCESS;
}
Status FlowGwClient::InitFlowGwClient() const {
std::string proc_sign = "";
uint32_t count = 0;
auto dgw_client = bqs::DgwClient::GetInstance(device_id_, pid_, is_proxy_);
GE_CHECK_NOTNULL(dgw_client);
constexpr uint32_t kMaxInitTimes = 30U;
while (count++ < kMaxInitTimes) {
GEEVENT("Init datagw client begin, flowgw server pid=%d, "
"device id=%u, is_proxy = %d, number of initializations=%u.",
pid_, device_id_, static_cast<int32_t>(is_proxy_), count);
int32_t res = dgw_client->Initialize(pid_, proc_sign, is_proxy_, kWaitTimeoutInSec);
if (res == 0) {
GE_CHK_STATUS_RET(SetHcclProtocol(), "Failed to set hccl protocol");
GEEVENT("Init datagw client success, flowgw server pid=%u, device id=%u, number of initializations=%u.",
pid_, device_id_, count);
return SUCCESS;
}
(void) mmSleep(kWaitTimeInMilliSecond);
}
GELOGE(FAILED, "Init datagw client failed, dgw server pid=%d, device id=%u.", pid_, device_id_);
REPORT_INNER_ERR_MSG("E19999", "Init datagw client failed, dgw server pid=%d, device id=%u.", pid_, device_id_);
return FAILED;
}
Status FlowGwClient::SetHcclProtocol() const {
const auto &node_config = Configurations::GetInstance().GetLocalNode();
const auto &protocol = node_config.protocol;
bqs::ConfigInfo cfg = {};
cfg.cmd = bqs::ConfigCmd::DGW_CFG_CMD_SET_HCCL_PROTOCOL;
cfg.cfg.hcclProtocolCfg.protocol = protocol == kProtocolTypeRdma ?
bqs::HcclProtocolType::RDMA :
bqs::HcclProtocolType::TCP;
std::vector<int32_t> results;
auto dgw_client = bqs::DgwClient::GetInstance(device_id_, pid_, is_proxy_);
GE_CHECK_NOTNULL(dgw_client);
int32_t ret = dgw_client->UpdateConfig(cfg, results, kWaitTimeoutInSec);
GE_CHK_BOOL_RET_STATUS(ret == 0, FAILED,
"[Set][HcclProtocol] failed, ret = %d, hccl protocol = %u, device_id = %u.",
ret, static_cast<uint32_t>(cfg.cfg.hcclProtocolCfg.protocol), device_id_);
GELOGI("[Set][HcclProtocol] success, hccl protocol = %u, device_id = %u.",
static_cast<uint32_t>(cfg.cfg.hcclProtocolCfg.protocol), device_id_);
return SUCCESS;
}
Status FlowGwClient::WaitConfigEffect() {
auto dgw_client = bqs::DgwClient::GetInstance(device_id_, pid_, is_proxy_);
GE_CHECK_NOTNULL(dgw_client);
int32_t ret = dgw_client->WaitConfigEffect(0, kWaitTimeoutInSec);
GE_CHK_BOOL_RET_STATUS(ret == 0 || ret == kFlowGwNotSupport, FAILED,
"[Wait][ConfigEffect] failed, ret = %d, device_id = %u, pid = %d.", ret, device_id_, pid_);
return SUCCESS;
}
Status FlowGwClient::UpdateProfiling() const {
bqs::ConfigInfo cfg = {};
cfg.cmd = bqs::ConfigCmd::DGW_CFG_CMD_UPDATE_PROFILING;
cfg.cfg.profCfg.profMode = bqs::ProfilingMode::PROFILING_OPEN;
std::vector<int32_t> results;
auto dgw_client = bqs::DgwClient::GetInstance(device_id_, pid_, is_proxy_);
GE_CHECK_NOTNULL(dgw_client);
int32_t ret = dgw_client->UpdateConfig(cfg, results, kWaitTimeoutInSec);
GE_CHK_BOOL_RET_STATUS(ret == 0, FAILED,
"[Update][Profiling] failed, ret = %d, device_id = %u.", ret, device_id_);
GELOGI("[Update][Profiling] success, device_id = %u.", device_id_);
return SUCCESS;
}
Status FlowGwClient::CreateHcomHandle() {
auto dgw_client = bqs::DgwClient::GetInstance(device_id_, pid_, is_proxy_);
GE_CHECK_NOTNULL(dgw_client);
int32_t res = dgw_client->CreateHcomHandle(rank_table_, rank_id_, nullptr, hcom_handle_, kWaitTimeoutInSec);
GE_CHK_BOOL_RET_STATUS(res == 0, FAILED,
"Init datagw handle failed, rank_table is %s, device_id is %u, rank_id is %d",
rank_table_.c_str(), device_id_, rank_id_);
GELOGI("[Create][Hcomm] handle success, device id=%u, hcom_handle=%lu", device_id_, hcom_handle_);
return SUCCESS;
}
Status FlowGwClient::CreateFlowGwGroup(const std::vector<const ExchangeEndpoint *> &endpoint_list,
int32_t &group_id) const {
GE_CHK_BOOL_RET_STATUS(!endpoint_list.empty(), FAILED,
"[Create][Group] failed, endpoint list is empty, device id=%u.", device_id_);
std::vector<bqs::Endpoint> endpoints;
ToFlowGwEndpoints(endpoint_list, endpoints);
bqs::ConfigInfo cfg = {};
cfg.cmd = bqs::ConfigCmd::DGW_CFG_CMD_ADD_GROUP;
cfg.cfg.groupCfg.endpointNum = static_cast<uint32_t>(endpoints.size());
cfg.cfg.groupCfg.endpoints = const_cast<bqs::Endpoint *>(&endpoints[0]);
std::vector<int32_t> results;
auto dgw_client = bqs::DgwClient::GetInstance(device_id_, pid_, is_proxy_);
GE_CHECK_NOTNULL(dgw_client);
int32_t ret = dgw_client->UpdateConfig(cfg, results, kWaitTimeoutInSec);
GE_CHK_BOOL_RET_STATUS(ret == 0, FAILED,
"[Create][Group] failed, ret = %d, endpoint size=%zu, device id=%u.",
ret, endpoint_list.size(), device_id_);
group_id = cfg.cfg.groupCfg.groupId;
GELOGD("[Create][Group] success, endpoint size=%zu, device id=%u.", endpoint_list.size(), device_id_);
return SUCCESS;
}
Status FlowGwClient::DestroyFlowGwGroup(int32_t group_id) const {
bqs::ConfigInfo cfg = {};
cfg.cmd = bqs::ConfigCmd::DGW_CFG_CMD_DEL_GROUP;
cfg.cfg.groupCfg.groupId = group_id;
std::vector<int32_t> results;
auto dgw_client = bqs::DgwClient::GetInstance(device_id_, pid_, is_proxy_);
GE_CHECK_NOTNULL(dgw_client);
int32_t ret = dgw_client->UpdateConfig(cfg, results, kWaitTimeoutInSec);
GE_CHK_BOOL_RET_STATUS(ret == 0, FAILED,
"[Destroy][Group] failed, ret = %d, group_id = %d, device id = %u, datagw pid = %d.",
ret, group_id, device_id_, pid_);
GELOGI("[Destroy][Group] success, group_id = %d, device id = %u, datagw pid = %d.", group_id, device_id_, pid_);
return SUCCESS;
}
Status FlowGwClient::ClearFlowgwModelData(const std::set<uint32_t> &model_ids, const int32_t type) {
GE_IF_BOOL_EXEC(!is_inited_, return SUCCESS);
bqs::ConfigInfo cfg = {};
if (is_exception_ && type == EXCEPTION_HANDLE_STOP) {
(void) DestroyHcomHandle();
(void) Finalize();
return SUCCESS;
}
if (type == EXCEPTION_HANDLE_STOP) {
cfg.cmd = bqs::ConfigCmd::DGW_CFG_CMD_STOP_SCHEDULE;
} else {
cfg.cmd = bqs::ConfigCmd::DGW_CFG_CMD_CLEAR_AND_RESTART_SCHEDULE;
}
auto model_num = model_ids.size();
std::unique_ptr<int32_t[]> model_ids_ptr = MakeUnique<int32_t[]>(model_num);
auto index = 0;
for (auto &model_id : model_ids) {
model_ids_ptr[index++] = static_cast<int32_t>(model_id);
}
cfg.cfg.reDeployCfg.rootModelNum = model_num;
cfg.cfg.reDeployCfg.rootModelIdsAddr = static_cast<uint64_t>(reinterpret_cast<uintptr_t>((model_ids_ptr.get())));
std::vector<int32_t> results;
auto dgw_client = bqs::DgwClient::GetInstance(device_id_, pid_, is_proxy_);
GE_CHECK_NOTNULL(dgw_client);
int32_t ret = dgw_client->UpdateConfig(cfg, results, kWaitTimeoutInSec);
GE_CHK_BOOL_RET_STATUS(ret == 0, FAILED,
"[ClearModelExceptionData] failed, ret = %d, type = %d, device id = %u, datagw pid = %d.",
ret, type, device_id_, pid_);
GELOGI("[ClearModelExceptionData] success, type = %d, device id = %u, datagw pid = %d.", type, device_id_, pid_);
return SUCCESS;
}
Status FlowGwClient::GrantQueueForRoute(const std::pair<const ExchangeEndpoint *,
const ExchangeEndpoint *> &queue_route) const {
GELOGD("[Grant][Queue] for flowgw route.");
if ((queue_route.first->type == ExchangeEndpointType::kEndpointTypeExternalQueue ||
queue_route.first->type == ExchangeEndpointType::kEndpointTypeQueue) &&
queue_route.first->device_type == device_type_) {
GE_CHK_STATUS_RET(GrantQueue(queue_route.first->device_id,
queue_route.first->id,
pid_,
GrantType::kReadOnly),
"Grant src queue failed, endpoint=[%s], datagw pid = %d",
queue_route.first->DebugString().c_str(), pid_);
}
if ((queue_route.second->type == ExchangeEndpointType::kEndpointTypeExternalQueue ||
queue_route.second->type == ExchangeEndpointType::kEndpointTypeQueue) &&
queue_route.second->device_type == device_type_) {
GE_CHK_STATUS_RET(GrantQueue(queue_route.second->device_id,
queue_route.second->id,
pid_,
GrantType::kWriteOnly),
"Grant dst queue failed, endpoint=[%s], datagw pid = %d",
queue_route.second->DebugString().c_str(), pid_);
}
return SUCCESS;
}
void FlowGwClient::PrintFlowRoute(
const std::vector<std::pair<const ExchangeEndpoint *,
const ExchangeEndpoint *>> &queue_routes,
bool print_error) {
for (auto &queue_route : queue_routes) {
if (print_error) {
GELOGE(FAILED, "Flow route info, src endpoint = [%s], dst endpoint = [%s]",
queue_route.first->DebugString().c_str(), queue_route.second->DebugString().c_str());
} else {
GELOGI("Flow route info, src endpoint = [%s], dst endpoint = [%s]",
queue_route.first->DebugString().c_str(), queue_route.second->DebugString().c_str());
}
}
}
Status FlowGwClient::FillCommChannelAttrEndpoint(const ExchangeEndpoint &endpoint, bqs::Endpoint &ret) {
ret.type = bqs::EndpointType::COMM_CHANNEL;
ret.attr.channelAttr.handle = endpoint.hcom_handle;
ret.attr.channelAttr.localTagId = endpoint.tag_id;
ret.attr.channelAttr.localRankId = endpoint.rank_id;
ret.attr.channelAttr.peerTagId = endpoint.peer_tag_id;
ret.attr.channelAttr.peerRankId = endpoint.peer_rank_id;
ret.attr.channelAttr.localTagDepth = endpoint.depth;
ret.attr.channelAttr.peerTagDepth = endpoint.depth;
return SUCCESS;
}
bqs::Endpoint FlowGwClient::ToFlowGwEndpoint(const ExchangeEndpoint &endpoint) const {
bqs::Endpoint ret = {};
ret.resId = 0x8000 |
static_cast<uint16_t>(endpoint.device_id) |
(static_cast<uint16_t>(endpoint.device_type) << 14U);
ret.globalId = endpoint.index;
ret.modelId = endpoint.model_id;
ret.rootModelId = endpoint.root_model_id;
switch (endpoint.type) {
case ExchangeEndpointType::kEndpointTypeTag: {
(void) FillCommChannelAttrEndpoint(endpoint, ret);
break;
}
case ExchangeEndpointType::kEndpointTypeExternalQueue:
case ExchangeEndpointType::kEndpointTypeQueue: {
ret.type = bqs::EndpointType::MEM_QUEUE;
ret.attr.memQueueAttr.queueId = static_cast<int32_t>(endpoint.id);
if (device_type_ != endpoint.device_type) {
ret.attr.memQueueAttr.queueType = 1U;
}
break;
}
case ExchangeEndpointType::kEndpointTypeGroup: {
ret.type = bqs::EndpointType::GROUP;
ret.attr.groupAttr.groupId = static_cast<int32_t>(endpoint.id);
ret.peerNum = endpoint.instance_num;
ret.localId = endpoint.instance_idx;
ret.attr.groupAttr.policy = endpoint.is_dynamic_sched ? bqs::GroupPolicy::DYNAMIC : bqs::GroupPolicy::HASH;
break;
}
default: {
break;
}
}
return ret;
}
void FlowGwClient::ToFlowGwRoutes(
const std::vector<std::pair<const ExchangeEndpoint *,
const ExchangeEndpoint *>> &queue_routes,
std::vector<bqs::Route> &flowgw_routes) const {
for (const auto &route : queue_routes) {
bqs::Route flowgw_route = {};
flowgw_route.src = ToFlowGwEndpoint(*route.first);
flowgw_route.dst = ToFlowGwEndpoint(*route.second);
flowgw_routes.emplace_back(flowgw_route);
}
}
void FlowGwClient::ToFlowGwEndpoints(
const std::vector<const ExchangeEndpoint *> &endpoints,
std::vector<bqs::Endpoint> &flowgw_endpoints) const {
for (const auto &endpoint : endpoints) {
bqs::Endpoint flowgw_endpoint = {};
flowgw_endpoint = ToFlowGwEndpoint(*endpoint);
flowgw_endpoints.emplace_back(flowgw_endpoint);
}
}
Status FlowGwClient::BindQueues(
const std::vector<std::pair<const ExchangeEndpoint *,
const ExchangeEndpoint *>> &queue_routes) const {
GELOGD("[Bind][Queues] device_id = %u, routes size = %zu", device_id_, queue_routes.size());
for (size_t i = 0U; i < queue_routes.size(); ++i) {
const auto &queue_route = queue_routes[i];
GE_CHK_STATUS_RET(GrantQueueForRoute(queue_route),
"[Grant][Queue] failed, src=[%s], dst=[%s].",
queue_route.first->DebugString().c_str(), queue_route.second->DebugString().c_str());
}
std::vector<bqs::Route> routes;
ToFlowGwRoutes(queue_routes, routes);
bqs::ConfigInfo cfg = {};
cfg.cmd = bqs::ConfigCmd::DGW_CFG_CMD_BIND_ROUTE;
cfg.cfg.routesCfg.routeNum = routes.size();
cfg.cfg.routesCfg.routes = const_cast<bqs::Route *>(routes.data());
std::vector<int32_t> bind_results;
auto dgw_client = bqs::DgwClient::GetInstance(device_id_, pid_, is_proxy_);
GE_CHECK_NOTNULL(dgw_client);
int32_t ret = dgw_client->UpdateConfig(cfg, bind_results, kWaitTimeoutInSec);
PrintFlowRoute(queue_routes, ret != 0);
GE_CHK_BOOL_RET_STATUS(ret == 0, FAILED,
"[Bind][Route] failed, ret = %d, device_id = %u, route size = %zu.",
ret, device_id_, queue_routes.size());
GELOGI("[Bind][Route] success, route size = %zu.", queue_routes.size());
return SUCCESS;
}
Status FlowGwClient::UnbindQueues(
const std::vector<std::pair<const ExchangeEndpoint *,
const ExchangeEndpoint *>> &queue_routes) const {
std::vector<bqs::Route> routes;
ToFlowGwRoutes(queue_routes, routes);
bqs::ConfigInfo cfg = {};
cfg.cmd = bqs::ConfigCmd::DGW_CFG_CMD_UNBIND_ROUTE;
cfg.cfg.routesCfg.routeNum = routes.size();
cfg.cfg.routesCfg.routes = const_cast<bqs::Route *>(routes.data());
std::vector<int32_t> results;
auto dgw_client = bqs::DgwClient::GetInstance(device_id_, pid_, is_proxy_);
GE_CHECK_NOTNULL(dgw_client);
int32_t ret = dgw_client->UpdateConfig(cfg, results, kWaitTimeoutInSec);
PrintFlowRoute(queue_routes, ret != 0);
GE_CHK_BOOL_RET_STATUS(ret == 0, FAILED,
"[Unbind][Route] failed, ret = %d, device_id = %u, route size = %zu.",
ret, device_id_, queue_routes.size());
GELOGI("[Unbind][Route] success, route size = %zu.", queue_routes.size());
return SUCCESS;
}
ExchangeEndpoint *FlowGwClient::GetEndpoint(std::map<int32_t, std::shared_ptr<ExchangeEndpoint>> &endpoints,
int32_t index) const {
auto it = endpoints.find(index);
if (it == endpoints.end()) {
GELOGE(PARAM_INVALID, "Failed to get endpoint by index: %d", index);
return nullptr;
}
return it->second.get();
}
bool FlowGwClient::IsExceptionGroup(const ExchangeEndpoint *group_endpoint,
std::map<int32_t, std::shared_ptr<ExchangeEndpoint>> &endpoints) const {
for (auto indice : group_endpoint->endpoint_indices) {
auto endpoint = GetEndpoint(endpoints, indice);
GE_CHECK_NOTNULL(endpoint);
if (endpoint->is_del) {
return true;
}
}
return false;
}
Status FlowGwClient::UpdateGroupRoute(const ExchangeEndpoint *group_endpoint,
std::map<int32_t, std::shared_ptr<ExchangeEndpoint>> &endpoints) const {
GELOGI("[UpdateExceptionRoutes] try update group, endpoint = %s.", group_endpoint->DebugString().c_str());
std::vector<int32_t> new_endpoint_indices;
for (auto indice : group_endpoint->endpoint_indices) {
auto endpoint = GetEndpoint(endpoints, indice);
GE_CHECK_NOTNULL(endpoint);
if (!endpoint->is_del) {
new_endpoint_indices.emplace_back(indice);
}
}
if (new_endpoint_indices.size() == group_endpoint->endpoint_indices.size()) {
GELOGI("[UpdateExceptionRoutes] group not need update.");
return SUCCESS;
}
GELOGI("[UpdateExceptionRoutes] delete old group, group id = %u.", group_endpoint->id);
GE_CHK_STATUS_RET(DestroyFlowGwGroup(static_cast<int32_t>(group_endpoint->id)),
"[UpdateExceptionRoutes] Failed to destroy flowgw old group.");
int32_t group_id = 0;
std::vector<const ExchangeEndpoint *> endpoint_list;
for (const auto index : new_endpoint_indices) {
auto endpoint = GetEndpoint(endpoints, index);
GE_CHECK_NOTNULL(endpoint);
endpoint_list.emplace_back(endpoint);
}
GE_CHK_STATUS_RET(CreateFlowGwGroup(endpoint_list, group_id),
"[UpdateExceptionRoutes] Failed to create flowgw new group.");
GELOGI("[UpdateExceptionRoutes] create new group, group id = %d.", group_id);
auto mutable_group_endpoint = GetEndpoint(endpoints, group_endpoint->index);
GE_CHECK_NOTNULL(mutable_group_endpoint);
mutable_group_endpoint->id = static_cast<uint32_t>(group_id);
mutable_group_endpoint->endpoint_indices = new_endpoint_indices;
return SUCCESS;
}
Status FlowGwClient::UpdateExceptionRoutes(
const std::vector<std::pair<const ExchangeEndpoint *,
const ExchangeEndpoint *>> &queue_routes,
std::map<int32_t, std::shared_ptr<ExchangeEndpoint>> &endpoints) const {
for (auto &queue_route : queue_routes) {
auto &src = queue_route.first;
auto &dst = queue_route.second;
GELOGI("[UpdateExceptionRoutes] start handle exception route, src endpoint = %s, dst endpoint = %s,"
" flowgw client info: device_id = %u, device_type = %d.",
src->DebugString().c_str(), dst->DebugString().c_str(), device_id_, device_type_);
if (src->is_del || dst->is_del) {
GELOGI("[UpdateExceptionRoutes] dst endpoint or src endpoint need delete, unbind route.");
GE_CHK_STATUS_RET(UnbindQueues({queue_route}), "[UpdateExceptionRoutes] Failed to bind routes.");
continue;
}
if (dst->type == ExchangeEndpointType::kEndpointTypeGroup) {
if ((src->type != ExchangeEndpointType::kEndpointTypeGroup) ||
((src->type == ExchangeEndpointType::kEndpointTypeGroup) && !IsExceptionGroup(src, endpoints))) {
GELOGI("[UpdateExceptionRoutes] dst is a group and src is normal, don't need update group, dynaimic process.");
continue;
}
}
GELOGI("[UpdateExceptionRoutes] dst endpoint or src endpoint need update group, unbind route first.");
GE_CHK_STATUS_RET(UnbindQueues({queue_route}), "[UpdateExceptionRoutes] Failed to unbind old route.");
if (src->type == ExchangeEndpointType::kEndpointTypeGroup) {
GE_CHK_STATUS_RET(UpdateGroupRoute(src, endpoints),
"[UpdateExceptionRoutes] Failed to update src group route.");
}
GELOGI("[UpdateExceptionRoutes] bind new route after update group, src endpoint = %s, dst endpoint = %s,",
src->DebugString().c_str(), dst->DebugString().c_str());
GE_CHK_STATUS_RET(BindQueues({queue_route}), "Failed to bind new route.");
}
return SUCCESS;
}
Status FlowGwClient::GrantQueue(uint32_t device_id, uint32_t qid, pid_t pid, GrantType grant_type) {
rtMemQueueShareAttr_t attr = {};
GE_CHK_BOOL_RET_STATUS(grant_type == GrantType::kReadOnly ||
grant_type == GrantType::kWriteOnly ||
grant_type == GrantType::kReadAndWrite,
FAILED,
"[Grant][Queue] type[%d] error.", static_cast<int32_t>(grant_type));
if (grant_type == GrantType::kReadOnly) {
attr.read = 1;
} else if (grant_type == GrantType::kWriteOnly) {
attr.write = 1;
} else {
attr.read = 1;
attr.write = 1;
}
GE_CHK_RT_RET(rtMemQueueGrant(device_id, qid, pid, &attr));
return SUCCESS;
}
Status FlowGwClient::Finalize() {
if (!is_inited_) {
GELOGI("FlowGw not start.");
return SUCCESS;
}
GELOGI("FlowGw shutdown begin.");
if (!is_proxy_) {
SubprocessManager::GetInstance().UnRegExcptHandleCallback(pid_);
GE_CHK_STATUS(Shutdown(pid_));
} else {
TsdClient::GetInstance().ShutdownSubprocess(static_cast<int32_t>(device_id_), pid_, kFlowGwProcessName);
}
is_inited_ = false;
GELOGI("FlowGw shutdown success.");
return SUCCESS;
}
Status FlowGwClient::DestroyHcomHandle() {
if (hcom_handle_ == kInvalidHcomHandle) {
return SUCCESS;
}
GEEVENT("[Destroy][Handle] begin, hcom_handle = %lu, device_id = %u, pid = %d", hcom_handle_, device_id_, pid_);
auto dgw_client = bqs::DgwClient::GetInstance(device_id_, pid_, is_proxy_);
GE_CHECK_NOTNULL(dgw_client);
const int32_t ret = dgw_client->DestroyHcomHandle(hcom_handle_, kDestroyHcomWaitTimeoutInSec);
GE_CHK_BOOL_RET_STATUS(ret == 0, FAILED,
"[Destroy][Handle] failed, hcom_handle=%lu, device id=%u",
hcom_handle_, device_id_);
hcom_handle_ = kInvalidHcomHandle;
GEEVENT("[Destroy][Handle] success, hcom_handle=%lu, device id=%u, pid = %d", hcom_handle_, device_id_, pid_);
return SUCCESS;
}
Status FlowGwClient::GetOrCreateHcomHandle(uint64_t &hcom_handle) {
if (hcom_handle_ == kInvalidHcomHandle) {
GE_CHK_STATUS_RET(CreateHcomHandle(), "Failed to create hcom handle");
}
hcom_handle = hcom_handle_;
return SUCCESS;
}
ProcStatus FlowGwClient::GetSubProcStat() const {
GE_IF_BOOL_EXEC(!is_inited_ || is_exception_, return ProcStatus::INVALID);
return status_func_();
}
void FlowGwClient::SetExceptionFlag() {
is_exception_ = true;
}
Status FlowGwClient::ConfigSchedInfoToDataGw(const uint32_t device_id, const int32_t input_indice,
const uint32_t input, const uint32_t output,
const uint32_t root_model_id, const bool is_proxy) const {
auto dgw_client = bqs::DgwClient::GetInstance(device_id_, pid_, is_proxy_);
GE_CHECK_NOTNULL(dgw_client);
GE_CHK_STATUS_RET(GrantQueue(device_id, input, pid_, GrantType::kReadOnly),
"DynamicSched Grant src queue failed, device id=%u, input queue id=%d, datagw pid=%d",
device_id, input, pid_);
GE_CHK_STATUS_RET(GrantQueue(device_id, output, pid_, GrantType::kWriteOnly),
"DynamicSched Grant src queue failed, device id=%u, output queue id=%d, datagw pid=%d",
device_id, output, pid_);
GELOGI("DynamicSched Grant src queue succ, device id=%u, input queue id=%d, output queue id=%d, datagw pid=%d",
device_id, input, output, pid_);
bqs::ConfigInfo cfg = {};
bqs::DynamicSchedConfigV2 dynamic_sched_cfg = {};
dynamic_sched_cfg.rootModelId = root_model_id;
dynamic_sched_cfg.requestQ.queueId = output;
dynamic_sched_cfg.requestQ.deviceId = device_id;
dynamic_sched_cfg.requestQ.deviceType = device_type_;
dynamic_sched_cfg.requestQ.isClientQ = is_proxy;
dynamic_sched_cfg.responseQ.queueId = input;
dynamic_sched_cfg.responseQ.deviceId = device_id;
dynamic_sched_cfg.responseQ.deviceType = device_type_;
dynamic_sched_cfg.responseQ.globalLogicId = input_indice;
dynamic_sched_cfg.responseQ.isClientQ = is_proxy;
cfg.cmd = bqs::ConfigCmd::DGW_CFG_CMD_INIT_DYNAMIC_SCHEDULE;
cfg.cfg.dynamicSchedCfgV2 = &dynamic_sched_cfg;
std::vector<int32_t> results;
int32_t ret = dgw_client->UpdateConfig(cfg, results, kWaitTimeoutInSec);
if (ret != 0) {
GELOGE(FAILED, "DynamicSched cfg failed, ret = %d, device id=%u, input queue id=%u, output queue id=%u,"
" datagw pid=%d.", ret, device_id, input, output, pid_);
REPORT_INNER_ERR_MSG("E19999", "DynamicSched cfg failed, ret = %d, device id=%u, input queue id=%u, "
"output queue id=%u, datagw pid=%d.", ret, device_id, input, output, pid_);
return FAILED;
}
GELOGD("DynamicSched cfg succ, root_model_id=%u, device id=%u, input queue id=%u, output queue id=%u,"
" datagw pid=%d, is_proxy=%d.", root_model_id, device_id, input, output, pid_, is_proxy);
return SUCCESS;
}
}