* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "deploy/execfwk/pne_executor_client.h"
#include "common/debug/ge_log.h"
#include "deploy/flowrm/flowgw_client.h"
#include "common/config/configurations.h"
#include "common/data_flow/queue/heterogeneous_exchange_service.h"
#include "common/utils/rts_api_utils.h"
namespace ge {
namespace {
constexpr uint32_t kBindAllDevice = 0xffffffff;
}
PneExecutorClient::PneExecutorClient(int32_t device_id) : device_id_(device_id) {
}
void PneExecutorClient::SetContext(const ClientContext& context) {
context_ = context;
}
const DeviceMaintenanceClientCfg *PneExecutorClient::GetDevMaintenanceCfg() const {
return context_.dev_maintenance_cfg;
}
int32_t PneExecutorClient::GetDeviceId() const {
return device_id_;
}
int32_t PneExecutorClient::GetProcessId() const {
return context_.process_id;
}
int32_t PneExecutorClient::GetDeployerPid() const {
return context_.deployer_pid;
}
const PneExecutorClient::ClientContext &PneExecutorClient::GetContext() const {
return context_;
}
bool PneExecutorClient::SupportSyncVarManager() {
return true;
}
Status PneExecutorClient::GrantQueuesForProcess(int32_t use_queue_pid, int32_t use_queue_process_device_type,
const deployer::ExecutorRequest_ModelQueuesAttrs &model_queues_attrs) {
std::vector<DeployQueueAttr> queue_attrs;
bool need_bind_host_pid = false;
for (const auto &input_queue : model_queues_attrs.input_queues_attrs()) {
if (use_queue_process_device_type == input_queue.device_type() && input_queue.queue_id() != UINT32_MAX) {
DeployQueueAttr queue_attr = {};
queue_attr.queue_id = input_queue.queue_id();
queue_attr.device_type = input_queue.device_type();
queue_attr.device_id = input_queue.device_id();
queue_attrs.emplace_back(queue_attr);
}
if ((use_queue_process_device_type == static_cast<int32_t>(CPU)) &&
(use_queue_process_device_type != input_queue.device_type())) {
need_bind_host_pid = true;
}
}
for (const auto &output_queue : model_queues_attrs.output_queues_attrs()) {
if (use_queue_process_device_type == output_queue.device_type() && output_queue.queue_id() != UINT32_MAX) {
DeployQueueAttr queue_attr = {};
queue_attr.queue_id = output_queue.queue_id();
queue_attr.device_type = output_queue.device_type();
queue_attr.device_id = output_queue.device_id();
queue_attrs.emplace_back(queue_attr);
}
if ((use_queue_process_device_type == static_cast<int32_t>(CPU)) &&
(use_queue_process_device_type != output_queue.device_type())) {
need_bind_host_pid = true;
}
}
GE_CHK_STATUS_RET(DoGrantQueues(use_queue_pid, queue_attrs), "Failed to grant queues, pid = %d.", use_queue_pid);
if (need_bind_host_pid) {
GE_CHK_STATUS_RET(DoBindHostPid(use_queue_pid), "Failed to bind host pid[%d].", use_queue_pid);
}
return SUCCESS;
}
Status PneExecutorClient::BindHostPid(const int32_t pid) const {
rtBindHostpidInfo info{};
info.cpType = RT_DEVDRV_PROCESS_USER;
uint32_t host_pid = static_cast<uint32_t>(getpid());
info.hostPid = host_pid;
info.chipId = kBindAllDevice;
info.len = static_cast<uint32_t>(RT_PROCESS_SIGN_LENGTH);
const auto ret = memcpy_s(info.sign, static_cast<size_t>(RT_PROCESS_SIGN_LENGTH), &pid, sizeof(pid_t));
GE_CHK_BOOL_RET_STATUS(ret == EOK, FAILED, "Failed to memcpy_s, ret = %d", ret);
GE_CHK_RT_RET(rtBindHostPid(info));
return SUCCESS;
}
PneExecutorClientFactory &PneExecutorClientFactory::GetInstance() {
static PneExecutorClientFactory instance;
return instance;
}
std::string PneExecutorClientFactory::GenerateClientKey(const std::string &engine_name,
bool is_proxy) const {
std::string key = engine_name + "_" + std::to_string(static_cast<uint32_t>(is_proxy));
return key;
}
void PneExecutorClientFactory::RegisterCreateFunc(const std::string &engine_name,
bool is_proxy,
PneExecutorClientFactory::CreateFunc func) {
std::string key = GenerateClientKey(engine_name, is_proxy);
create_funcs_[key] = std::move(func);
}
std::unique_ptr<PneExecutorClient> PneExecutorClientFactory::CreateClient(const std::string &engine_name,
bool is_proxy,
int32_t device_id) {
std::string key = GenerateClientKey(engine_name, is_proxy);
auto func = create_funcs_[key];
if (func == nullptr) {
GELOGE(UNSUPPORTED, "Unsupported client, engine type = %s, is_proxy = %d",
engine_name.c_str(), is_proxy);
return nullptr;
}
return func(device_id);
}
}