* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "router_server.h"
#include <map>
#include <csignal>
#include <sstream>
#include <securec.h>
#include <sys/types.h>
#include <unistd.h>
#include "statistic_manager.h"
#include "bqs_log.h"
#include "common/bqs_util.h"
#include "hccl_process.h"
#include "hccl/hccl_so_manager.h"
#include "common/type_def.h"
#include "queue_manager.h"
#include "queue_schedule_hal_interface_ref.h"
#include "qs_interface_process.h"
#include "queue_schedule_sub_module_interface.h"
#include "entity_manager.h"
namespace bqs {
namespace {
const std::string PIPELINE_QUEUE_NAME = "QsPipeQueue";
constexpr const uint16_t MAJOR_VERSION = 3U;
constexpr const uint16_t MINOR_VERSION = 0U;
constexpr const QueueShareAttr ADMIN_QUEUE_ATTR = {1U, 1U, 1U, 0U};
constexpr const char_t *ROUTER_SERVER_THREAD_NAME_PREFIX = "router_server";
const std::map<int32_t, int32_t> g_reqRspMapping = {
{AICPU_BIND_QUEUE, AICPU_BIND_QUEUE_RES},
{AICPU_BIND_QUEUE_INIT, AICPU_BIND_QUEUE_INIT_RES},
{AICPU_UNBIND_QUEUE, AICPU_UNBIND_QUEUE_RES},
{AICPU_QUERY_QUEUE, AICPU_QUERY_QUEUE_RES},
{AICPU_QUERY_QUEUE_NUM, AICPU_QUERY_QUEUE_NUM_RES}
};
const std::map<uint32_t, QsOperType> g_qsOperation = {
{static_cast<uint32_t>(AICPU_BIND_QUEUE_INIT), QsOperType::BIND_INIT},
{static_cast<uint32_t>(AICPU_BIND_QUEUE_INIT), QsOperType::BIND_INIT},
{static_cast<uint32_t>(AICPU_QUERY_QUEUE_NUM), QsOperType::QUERY_NUM},
{static_cast<uint32_t>(AICPU_QUEUE_RELATION_PROCESS), QsOperType::RELATION_PROCESS},
{static_cast<uint32_t>(ACL_BIND_QUEUE_INIT), QsOperType::BIND_INIT},
{static_cast<uint32_t>(ACL_BIND_QUEUE), QsOperType::RELATION_PROCESS},
{static_cast<uint32_t>(ACL_UNBIND_QUEUE), QsOperType::RELATION_PROCESS},
{static_cast<uint32_t>(ACL_QUERY_QUEUE_NUM), QsOperType::QUERY_NUM},
{static_cast<uint32_t>(ACL_QUERY_QUEUE), QsOperType::RELATION_PROCESS},
{static_cast<uint32_t>(DGW_CREATE_HCOM_HANDLE), QsOperType::CREATE_HCOM_HANDLE},
{static_cast<uint32_t>(DGW_DESTORY_HCOM_HANDLE), QsOperType::DESTROY_HCOM_HANDLE},
{static_cast<uint32_t>(BIND_HOSTPID), QsOperType::BIND_HOST_PID},
{static_cast<uint32_t>(UPDATE_CONFIG), QsOperType::UPDATE_CONFIG},
{static_cast<uint32_t>(QUERY_CONFIG_NUM), QsOperType::QUERY_CONFIG_NUM},
{static_cast<uint32_t>(QUERY_CONFIG), QsOperType::QUERY_CONFIG},
{static_cast<uint32_t>(QUERY_LINKSTATUS), QsOperType::QUERY_LINKSTATUS},
{static_cast<uint32_t>(QUERY_LINKSTATUS_V2), QsOperType::QUERY_LINKSTATUS_V2},
};
struct old_event_sync_msg {
int pid;
unsigned int dst_engine : 4;
unsigned int gid : 6;
unsigned int event_id : 6;
unsigned int subevent_id : 16;
char msg[];
};
}
RouterServer::RouterServer() : processing_(false), done_(false), processingExtra_(false), doneExtra_(true),
bindQueueGroupId_(static_cast<uint32_t>(BIND_QUEUE_GROUP_ID)),
running_(false), deviceId_(0U), srcPid_(-1), srcVersion_(0U),
srcGroupId_(-1), pipelineQueueId_(MAX_QUEUE_ID_NUM), subEventId_(0U),
deployMode_(QueueSchedulerRunMode::MULTI_PROCESS),
retCode_(static_cast<int32_t>(BQS_STATUS_OK)), attachedFlag_(false),
isAicpuEvent_(false), qsRouteListPtr_(nullptr), qsRouterHeadPtr_(nullptr),
qsRouterQueryPtr_(nullptr), drvSyncMsg_(nullptr), aicpuRspHead_(0UL),
f2nfGroupId_(0U), schedPolicy_(0UL), cfgInfoOperator_(nullptr), callHcclFlag_(false),
numaFlag_(false), readyToHandleMsg_(false), manageThreadStatus_(ThreadStatus::NOT_INIT),
needAttachGroup_(false), compatMsg_(false)
{}
void RouterServer::Destroy()
{
if (!running_) {
return;
}
BQS_LOG_INFO("[RouterServer]QS Server destroy.");
running_ = false;
queueRouteQueryList_.clear();
cv_.notify_all();
if (monitorQsEvent_.joinable()) {
monitorQsEvent_.join();
}
manageThreadStatus_ = ThreadStatus::NOT_INIT;
if (pipelineQueueId_ < MAX_QUEUE_ID_NUM) {
const auto ret = halQueueDestroy(deviceId_, pipelineQueueId_);
BQS_LOG_ERROR_WHEN(ret != DRV_ERROR_NONE,
"[RouterServer]Destroy relation event buff queue error, queue id[%u], ret[%d]",
pipelineQueueId_.load(), static_cast<int32_t>(ret));
}
cfgInfoOperator_ = nullptr;
BQS_LOG_RUN_INFO("[RouterServer]QS Server finish destroy.");
}
RouterServer::~RouterServer()
{
Destroy();
}
bool RouterServer::GetCallHcclFlag() const
{
return callHcclFlag_;
}
uint32_t RouterServer::GetPipelineQueueId() const
{
return pipelineQueueId_;
}
RouterServer &RouterServer::GetInstance()
{
static RouterServer instance;
return instance;
}
void RouterServer::HandleBqsMsg(event_info &info)
{
if (info.comm.event_id != EVENT_QS_MSG) {
BQS_LOG_ERROR("[RouterServer]Queue schedule do not support [%d] event",
static_cast<int32_t>(info.comm.event_id));
return;
}
subEventId_ = info.comm.subevent_id;
isAicpuEvent_ = (subEventId_ <= static_cast<uint32_t>(AICPU_RELATED_MESSAGE_SPLIT)) ? true : false;
if (!isAicpuEvent_) {
drvSyncMsg_ = PtrToPtr<char_t, event_sync_msg>(info.priv.msg);
}
if (!readyToHandleMsg_) {
BQS_LOG_WARN("[RouterServer] is not ready to HandleBqsMsg");
SendRspEvent(static_cast<int32_t>(BQS_STATUS_NOT_INIT));
return;
}
const auto iter = g_qsOperation.find(subEventId_);
if (iter == g_qsOperation.end()) {
BQS_LOG_RUN_INFO("[RouterServer]SubEventId is invalid[%d]", subEventId_);
SendRspEvent(static_cast<int32_t>(BQS_STATUS_PARAM_INVALID));
return;
}
if (isAicpuEvent_ && (deployMode_ == QueueSchedulerRunMode::MULTI_THREAD)) {
BQS_LOG_ERROR("[RouterServer]Thread mode[%u] does not sopport event[%u] from aicpu.",
static_cast<int32_t>(deployMode_), subEventId_);
return;
}
PreProcessEvent(info);
BQS_LOG_INFO("[RouterServer]HandleBqsMsg end, isAicpuEvent[%d].", static_cast<int32_t>(isAicpuEvent_));
return;
}
void RouterServer::PreProcessEvent(const event_info &info)
{
BQS_LOG_INFO("[RouterServer]PreProcess start operation[%u]", subEventId_);
const auto iter = g_qsOperation.find(subEventId_);
if (iter == g_qsOperation.end()) {
SendRspEvent(static_cast<int32_t>(BQS_STATUS_PARAM_INVALID));
BQS_LOG_ERROR("[RouterServer] RouterServer receive unsupported msg type:%d", subEventId_);
return;
}
const QsOperType operType = iter->second;
switch (operType) {
case QsOperType::BIND_INIT:
SendRspEvent(static_cast<int32_t>(ProcessBindInit(info)));
break;
case QsOperType::QUERY_NUM:
StatisticManager::GetInstance().GetBindStat();
ParseGetBindNumMsg(info);
break;
case QsOperType::RELATION_PROCESS: {
Mbuf *mBuf = nullptr;
const auto resultCode = ParseRelationInfo(&mBuf);
if (resultCode != BQS_STATUS_OK) {
BQS_LOG_ERROR("[RouterServer]Get detail message from mbuf failed ret[%d].",
static_cast<int32_t>(resultCode));
SendRspEvent(static_cast<int32_t>(resultCode));
if ((mBuf != nullptr) && (srcVersion_ != 0U)) {
const auto freeRet = halMbufFree(mBuf);
BQS_LOG_ERROR_WHEN(freeRet != static_cast<int32_t>(DRV_ERROR_NONE),
"Free mbuf failed, ret is %d", freeRet);
}
return;
}
BQS_LOG_INFO("[RouterServer]Start to process relation event[%u].", subEventId_);
ProcessQueueRelationEvent(mBuf);
break;
}
case QsOperType::UPDATE_CONFIG:
case QsOperType::CREATE_HCOM_HANDLE:
case QsOperType::DESTROY_HCOM_HANDLE:
case QsOperType::QUERY_CONFIG:
case QsOperType::QUERY_CONFIG_NUM: {
ProcessConfigEvent(operType);
break;
}
case QsOperType::QUERY_LINKSTATUS: {
ProcessQueryLinkStatusEvent();
break;
}
case QsOperType::QUERY_LINKSTATUS_V2: {
ProcessQueryLinkStatusEvent();
break;
}
default: {
SendRspEvent(static_cast<int32_t>(BQS_STATUS_PARAM_INVALID));
BQS_LOG_RUN_INFO("[RouterServer]RouterServer receive unsupported msg type:%u", subEventId_);
break;
}
}
return;
}
void RouterServer::ProcessConfigEvent(const QsOperType operType)
{
void *mbuf = nullptr;
auto drvRet = halQueueDeQueue(deviceId_, pipelineQueueId_, &mbuf);
if ((drvRet != DRV_ERROR_NONE) || (mbuf == nullptr)) {
BQS_LOG_ERROR("halQueueDeQueue from queue[%u] in device[%u] failed, error[%d]", pipelineQueueId_.load(),
deviceId_, static_cast<int32_t>(drvRet));
SendRspEvent(static_cast<int32_t>(BQS_STATUS_DRIVER_ERROR));
return;
}
auto resultCode = (cfgInfoOperator_ == nullptr) ? BQS_STATUS_INNER_ERROR :
cfgInfoOperator_->ParseConfigEvent(subEventId_, pipelineQueueId_, mbuf, srcVersion_);
if (resultCode == BQS_STATUS_WAIT) {
resultCode = WaitSyncMsgProc();
}
if (operType == QsOperType::CREATE_HCOM_HANDLE) {
callHcclFlag_ = true;
}
if (srcVersion_ != 0U) {
BQS_LOG_INFO("Enque mbuf back");
drvRet = halQueueEnQueue(deviceId_, pipelineQueueId_, mbuf);
if (drvRet != DRV_ERROR_NONE) {
BQS_LOG_ERROR("halQueueEnQueue into queue[%u] in device[%u] failed, error[%d]", pipelineQueueId_.load(),
deviceId_, static_cast<int32_t>(drvRet));
SendRspEvent(static_cast<int32_t>(BQS_STATUS_DRIVER_ERROR));
const auto freeRet = halMbufFree(PtrToPtr<void, Mbuf>(mbuf));
BQS_LOG_ERROR_WHEN(freeRet != static_cast<int32_t>(DRV_ERROR_NONE),
"Free mbuf failed, ret is %d", freeRet);
return;
}
}
BQS_LOG_INFO("config for operate[%d] resultCode is %d.", static_cast<int32_t>(operType),
static_cast<int32_t>(resultCode));
SendRspEvent(static_cast<int32_t>(resultCode));
}
void RouterServer::ProcessQueryLinkStatusEvent()
{
int32_t ret = static_cast<int32_t>(dgw::EntityManager::Instance(0U).CheckLinkStatus());
if ((ret == 0) && numaFlag_) {
ret = static_cast<int32_t>(dgw::EntityManager::Instance(1U).CheckLinkStatus());
}
SendRspEvent(ret);
}
void RouterServer::ProcessQueueRelationEvent(Mbuf *mbuf)
{
BqsStatus ret = BQS_STATUS_INNER_ERROR;
switch (subEventId_) {
case AICPU_BIND_QUEUE: {
StatisticManager::GetInstance().BindStat();
ret = WaitBindMsgProc();
break;
}
case ACL_BIND_QUEUE: {
StatisticManager::GetInstance().BindStat();
ret = WaitBindMsgProc();
break;
}
case AICPU_UNBIND_QUEUE: {
StatisticManager::GetInstance().UnbindStat();
ret = WaitBindMsgProc();
break;
}
case ACL_UNBIND_QUEUE: {
StatisticManager::GetInstance().UnbindStat();
ret = WaitBindMsgProc();
break;
}
case AICPU_QUERY_QUEUE: {
StatisticManager::GetInstance().GetBindStat();
ret = ParseGetBindDetailMsg();
break;
}
case ACL_QUERY_QUEUE: {
StatisticManager::GetInstance().GetBindStat();
ret = ParseGetBindDetailMsg();
break;
}
default:
BQS_LOG_ERROR("[RouterServer]unsupport subEventId[%u] in bind relation procedure", subEventId_);
break;
}
if (srcVersion_ != 0U) {
BQS_LOG_INFO("Enque mbuf back.");
const auto drvRet = halQueueEnQueue(deviceId_, pipelineQueueId_, mbuf);
if (drvRet != DRV_ERROR_NONE) {
BQS_LOG_ERROR("halQueueEnQueue into queue[%u] in device[%u] failed, error[%d].", pipelineQueueId_.load(),
deviceId_, static_cast<int32_t>(drvRet));
SendRspEvent(static_cast<int32_t>(BQS_STATUS_DRIVER_ERROR));
const auto freeRet = halMbufFree(mbuf);
BQS_LOG_ERROR_WHEN(freeRet != static_cast<int32_t>(DRV_ERROR_NONE),
"Free mbuf failed, ret is %d", freeRet);
return;
}
}
SendRspEvent(static_cast<int32_t>(ret));
return;
}
BqsStatus RouterServer::WaitBindMsgProc()
{
BQS_LOG_INFO("[RouterServer]Bind relation [add/del], stage [wait]");
auto ret = ParseBindUnbindMsg();
if (ret == BQS_STATUS_OK) {
ret = WaitSyncMsgProc();
}
BQS_LOG_INFO("[RouterServer]RouterServer WaitBindMsgProc end");
return ret;
}
BqsStatus RouterServer::AttachAndInitGroup()
{
BQS_LOG_INFO("[RouterServer]Attach and init group begin.");
int32_t drvRet = 0;
if (qsInitGroupName_.empty() && (!SubModuleInterface::GetInstance().GetStartFlag())) {
const std::unique_ptr<GroupQueryOutput> groupInfoPtr(new (std::nothrow) GroupQueryOutput());
if (groupInfoPtr == nullptr) {
BQS_LOG_ERROR("[RouterServer] Fail to allocate GroupQueryOutput");
return BQS_STATUS_INNER_ERROR;
}
GroupQueryOutput &groupInfo = *(groupInfoPtr.get());
uint32_t groupInfoLen = 0U;
pid_t curPid = drvDeviceGetBareTgid();
drvRet = halGrpQuery(GRP_QUERY_GROUPS_OF_PROCESS, &curPid, static_cast<uint32_t>(sizeof(curPid)),
reinterpret_cast<void *>(&groupInfo), &groupInfoLen);
if (drvRet != static_cast<int32_t>(DRV_ERROR_NONE)) {
BQS_LOG_ERROR("[RouterServer]halGrpQuery of qs[%d] failed before attached,ret[%d]", curPid, drvRet);
return BQS_STATUS_DRIVER_ERROR;
}
if (groupInfoLen == 0U) {
BQS_LOG_ERROR("[RouterServer]QS should be add sharepool group before initial by aicpu or acl.");
return BQS_STATUS_INNER_ERROR;
}
if ((groupInfoLen % sizeof(groupInfo.grpQueryGroupsOfProcInfo[0])) != 0U) {
BQS_LOG_ERROR("[RouterServer]Group info size[%d] is invalid", groupInfoLen);
return BQS_STATUS_DRIVER_ERROR;
}
const uint32_t groupNum = static_cast<uint32_t>(groupInfoLen / sizeof(groupInfo.grpQueryGroupsOfProcInfo[0]));
for (uint32_t i = 0U; i < groupNum; ++i) {
drvRet = halGrpAttach(groupInfo.grpQueryGroupsOfProcInfo[i].groupName, 0);
if (drvRet != static_cast<int32_t>(DRV_ERROR_NONE)) {
BQS_LOG_ERROR("[RouterServer]Group[%s] attach failed for slave aicpusd[%d] ret[%d]",
groupInfo.grpQueryGroupsOfProcInfo[i].groupName, curPid, drvRet);
return BQS_STATUS_DRIVER_ERROR;
}
BQS_LOG_INFO("[RouterServer] halGrpAttach execute succ. group[%s] was attached by QS",
groupInfo.grpQueryGroupsOfProcInfo[i].groupName);
}
}
attachedFlag_ = true;
BuffCfg defaultCfg = {};
drvRet = halBuffInit(&defaultCfg);
if ((drvRet != static_cast<int32_t>(DRV_ERROR_NONE)) &&
(drvRet != static_cast<int32_t>(DRV_ERROR_REPEATED_INIT))) {
BQS_LOG_ERROR("[RouterServer] Buffer initial failed for qs. ret[%d]", drvRet);
return BQS_STATUS_DRIVER_ERROR;
}
BQS_LOG_INFO("[RouterServer] Buffer init success ret[%d]", drvRet);
return BQS_STATUS_OK;
}
BqsStatus RouterServer::CreateAndGrantPipelineQueue()
{
BQS_LOG_INFO("[RouterServer]Create and grant pipeline queue begin.");
const std::unique_lock<std::mutex> lk(mutex_);
QueueAttr queAttr = {};
std::string nameStr(PIPELINE_QUEUE_NAME);
pid_t curPidTemp = 0;
if (bqs::GetRunContext() == bqs::RunContext::HOST) {
curPidTemp = getpid();
queAttr.deploy_type = LOCAL_QUEUE_DEPLOY;
} else {
curPidTemp = drvDeviceGetBareTgid();
queAttr.deploy_type = CLIENT_QUEUE_DEPLOY;
}
const uint32_t curPid = static_cast<uint32_t>(curPidTemp);
nameStr += std::to_string(curPid);
const auto memcpyRet = memcpy_s(queAttr.name, static_cast<uint32_t>(QUEUE_MAX_STR_LEN),
nameStr.c_str(), nameStr.length() + 1UL);
if (memcpyRet != EOK) {
BQS_LOG_ERROR("[RouterServer]CreateAndGrantPipelineQueue memcpy_s failed, ret=%d.", memcpyRet);
return BQS_STATUS_INNER_ERROR;
}
queAttr.depth = 2U;
uint32_t queueId = 0U;
auto drvRet = halQueueCreate(deviceId_, &queAttr, &queueId);
if ((drvRet != DRV_ERROR_NONE) || (queueId >= MAX_QUEUE_ID_NUM)) {
BQS_LOG_ERROR("[RouterServer]Create queue[%s] error or qID[%u] is invalid, ret[%d]",
PIPELINE_QUEUE_NAME.c_str(), queueId, static_cast<int32_t>(drvRet));
return BQS_STATUS_DRIVER_ERROR;
}
drvRet = halQueueAttach(deviceId_, queueId, 0);
if (drvRet != DRV_ERROR_NONE) {
BQS_LOG_ERROR("Fail to attach queue[%ud], result[%d]", queueId, static_cast<int32_t>(drvRet));
return BQS_STATUS_DRIVER_ERROR;
}
pipelineQueueId_ = queueId;
if (deployMode_ == QueueSchedulerRunMode::MULTI_THREAD) {
BQS_LOG_INFO("[RouterServer]Thread mode need not grant queue to other process");
return BQS_STATUS_OK;
}
drvRet = halQueueGrant(deviceId_, static_cast<int32_t>(queueId), srcPid_, ADMIN_QUEUE_ATTR);
if (drvRet != DRV_ERROR_NONE) {
BQS_LOG_ERROR("[RouterServer]Fail to add queue[%d] authority for aicpusd[%d], result[%d].",
queueId, srcPid_, static_cast<int32_t>(drvRet));
return BQS_STATUS_DRIVER_ERROR;
}
BQS_LOG_RUN_INFO("Success to init pipelineQ[%u].", pipelineQueueId_.load());
return BQS_STATUS_OK;
}
BqsStatus RouterServer::ProcessBindInit(const event_info &info)
{
if (info.priv.msg_len != sizeof(QsBindInit)) {
BQS_LOG_ERROR("[RouterServer]Bind initial event message invalid, msgLen[%u]", info.priv.msg_len);
return BQS_STATUS_PARAM_INVALID;
}
if ((pipelineQueueId_ < MAX_QUEUE_ID_NUM) && (srcPid_ != -1)) {
BQS_LOG_RUN_INFO("Pipeline queue already existed[%d], return pipelienQueueid", pipelineQueueId_.load());
return BQS_STATUS_OK;
}
const QsBindInit * const bindInitMsg = reinterpret_cast<const QsBindInit *>(info.priv.msg);
aicpuRspHead_ = bindInitMsg->syncEventHead;
srcPid_ = bindInitMsg->pid;
srcVersion_ = bindInitMsg->majorVersion;
srcGroupId_ = static_cast<int32_t>(bindInitMsg->grpId);
BQS_LOG_RUN_INFO("[RouterServer]Get hostpid[%d] srcGroup[%d], srcVersion[%u]",
srcPid_, srcGroupId_.load(), srcVersion_);
if (((deployMode_ == QueueSchedulerRunMode::SINGLE_PROCESS) ||
(deployMode_ == QueueSchedulerRunMode::MULTI_PROCESS)) && (!attachedFlag_)) {
BQS_LOG_INFO("[RouterServer]start up attach and init group.");
const auto attachRet = AttachAndInitGroup();
if (attachRet != BQS_STATUS_OK) {
BQS_LOG_ERROR("[RouterServer]AttachAndInitGroup failed, ret[%d].", static_cast<int32_t>(attachRet));
return attachRet;
}
}
if (qsInitGroupName_.empty()) {
auto queueInitRet = QueueManager::GetInstance().InitQueue();
if (queueInitRet != BQS_STATUS_OK) {
BQS_LOG_ERROR("[RouterServer] Queue init failed");
return queueInitRet;
}
if (numaFlag_) {
queueInitRet = QueueManager::GetInstance().InitQueueExtra();
if (queueInitRet != BQS_STATUS_OK) {
BQS_LOG_ERROR("[RouterServer] Queue init failed");
return queueInitRet;
}
}
}
const auto ret = CreateAndGrantPipelineQueue();
if (ret != BQS_STATUS_OK) {
return ret;
}
BQS_LOG_RUN_INFO("First bind initial success, srcPid[%d], srvGroupId[%d], pipelineQueueId[%u]",
srcPid_, srcGroupId_.load(), pipelineQueueId_.load());
return BQS_STATUS_OK;
}
void RouterServer::FillRspContent(QsProcMsgRsp &retRsp, const int32_t resultCode)
{
retRsp.syncEventHead = isAicpuEvent_ ? aicpuRspHead_ : 0UL;
retRsp.retCode = resultCode;
retRsp.minorVersion = MINOR_VERSION;
retRsp.majorVersion = MAJOR_VERSION;
if ((subEventId_ == static_cast<uint32_t>(AICPU_BIND_QUEUE_INIT)) ||
(subEventId_ == static_cast<uint32_t>(ACL_BIND_QUEUE_INIT))) {
retRsp.retValue = (resultCode == static_cast<int32_t>(BQS_STATUS_OK)) ? pipelineQueueId_.load()
: MAX_QUEUE_ID_NUM;
}
if ((subEventId_ == static_cast<uint32_t>(AICPU_QUERY_QUEUE_NUM)) ||
(subEventId_ == static_cast<uint32_t>(ACL_QUERY_QUEUE_NUM))) {
retRsp.retValue = (resultCode != static_cast<int32_t>(BQS_STATUS_OK)) ? 0U :
static_cast<uint32_t>(queueRouteQueryList_.size());
queueRouteQueryList_.clear();
}
if ((subEventId_ == static_cast<uint32_t>(AICPU_QUERY_QUEUE)) ||
(subEventId_ == static_cast<uint32_t>(ACL_QUERY_QUEUE)) ||
(subEventId_ == static_cast<uint32_t>(AICPU_BIND_QUEUE)) ||
(subEventId_ == static_cast<uint32_t>(ACL_BIND_QUEUE)) ||
(subEventId_ == static_cast<uint32_t>(AICPU_UNBIND_QUEUE)) ||
(subEventId_ == static_cast<uint32_t>(ACL_UNBIND_QUEUE))) {
retRsp.retValue = pipelineQueueId_;
}
return;
}
void RouterServer::SendRspEvent(const int32_t result)
{
BQS_LOG_INFO("[RouterServer]Start to response message subeventid[%u]", subEventId_);
QsProcMsgRsp retRsp = {};
FillRspContent(retRsp, result);
event_summary qsEvent = {};
qsEvent.msg = PtrToPtr<QsProcMsgRsp, char_t>(&retRsp);
qsEvent.msg_len = static_cast<uint32_t>(sizeof(retRsp));
auto drvRet = DRV_ERROR_NONE;
if (!isAicpuEvent_) {
BQS_LOG_INFO("[RouterServer] Do ACL response");
qsEvent.dst_engine = compatMsg_ ? PtrToPtr<event_sync_msg, old_event_sync_msg>(drvSyncMsg_)->dst_engine
: drvSyncMsg_->dst_engine;
qsEvent.policy = ONLY;
qsEvent.pid = compatMsg_ ? PtrToPtr<event_sync_msg, old_event_sync_msg>(drvSyncMsg_)->pid
: drvSyncMsg_->pid;
qsEvent.grp_id = compatMsg_ ? PtrToPtr<event_sync_msg, old_event_sync_msg>(drvSyncMsg_)->gid
: drvSyncMsg_->gid;
const int32_t eventId = compatMsg_ ? PtrToPtr<event_sync_msg, old_event_sync_msg>(drvSyncMsg_)->event_id
: drvSyncMsg_->event_id;
qsEvent.event_id = static_cast<EVENT_ID>(eventId);
qsEvent.subevent_id = compatMsg_ ? PtrToPtr<event_sync_msg, old_event_sync_msg>(drvSyncMsg_)->subevent_id
: drvSyncMsg_->subevent_id;
drvRet = halEschedSubmitEvent(deviceId_, &qsEvent);
drvSyncMsg_ = nullptr;
BQS_LOG_INFO("[SendRspEvent] dst_engine[%u], pid[%d], grp_id[%u], eventId[%d], subevent_id[%u], deviceId[%u]",
qsEvent.dst_engine, qsEvent.pid, qsEvent.grp_id, qsEvent.event_id, qsEvent.subevent_id, deviceId_);
} else {
BQS_LOG_INFO("[RouterServer] Do AICPU response");
qsEvent.pid = srcPid_;
qsEvent.grp_id = static_cast<uint32_t>(srcGroupId_);
qsEvent.event_id = EVENT_QS_MSG;
qsEvent.msg_len = static_cast<uint32_t>(sizeof(QsProcMsgRspDstAicpu));
const auto iter = g_reqRspMapping.find(static_cast<int32_t>(subEventId_));
if (iter != g_reqRspMapping.end()) {
qsEvent.subevent_id = static_cast<uint32_t>(iter->second);
} else {
BQS_LOG_ERROR("[RouterServer]ERROR Invalid subeventId[%u]", subEventId_);
}
drvRet = halEschedSubmitEvent(deviceId_, &qsEvent);
aicpuRspHead_ = 0UL;
}
BQS_LOG_ERROR_WHEN(drvRet != DRV_ERROR_NONE, "[RouterServer]ERROR failed to submit event[%u], result[%d].",
subEventId_, static_cast<int32_t>(drvRet));
BQS_LOG_INFO("[RouterServer]Finish response message subeventid[%u] ret[%d]", subEventId_, result);
qsRouteListPtr_ = nullptr;
qsRouterHeadPtr_ = nullptr;
subEventId_ = 0U;
return;
}
void RouterServer::ProcessBindQueue(const uint32_t index)
{
BQS_LOG_INFO("[RouterServer]Bind relation [add], stage [server:process].");
auto &relationInstance = BindRelation::GetInstance();
retCode_ = static_cast<int32_t>(BQS_STATUS_OK);
QueueRoute *queueRouteList = qsRouteListPtr_;
for (uint32_t i = 0U; i < qsRouterHeadPtr_->routeNum; ++i) {
if (queueRouteList->status != static_cast<int32_t>(BQS_STATUS_OK)) {
retCode_ = static_cast<int32_t>(BQS_STATUS_QUEUE_AHTU_ERROR);
queueRouteList->status = 0;
queueRouteList = queueRouteList + 1U;
continue;
}
EntityInfo srcEntity = CreateBasicEntityInfo(queueRouteList->srcId,
static_cast<dgw::EntityType>(queueRouteList->srcType));
EntityInfo dstEntity = CreateBasicEntityInfo(queueRouteList->dstId,
static_cast<dgw::EntityType>(queueRouteList->dstType));
const auto result = relationInstance.Bind(srcEntity, dstEntity, index);
if (result == BQS_STATUS_RETRY) {
queueRouteList = queueRouteList + 1U;
continue;
} else if (result != BQS_STATUS_OK) {
retCode_ = static_cast<int32_t>(result);
queueRouteList->status = 0;
} else {
queueRouteList->status = 1;
}
queueRouteList = queueRouteList + 1U;
BQS_LOG_RUN_INFO("Bind relation [add], stage [server:process], relation [src:%s, dst:%s, result:%d]",
srcEntity.ToString().c_str(), dstEntity.ToString().c_str(), static_cast<int32_t>(result));
}
relationInstance.Order(index);
return;
}
void RouterServer::ProcessUnbindQueue(const uint32_t index)
{
BQS_LOG_INFO("[RouterServer]Unbind relation [del], stage [server:process].");
QueueRoute *queueRouteList = qsRouteListPtr_;
retCode_ = static_cast<int32_t>(BQS_STATUS_OK);
auto &relationInstance = BindRelation::GetInstance();
for (uint32_t i = 0U; i < qsRouterHeadPtr_->routeNum; ++i) {
if (queueRouteList->status != static_cast<int32_t>(BQS_STATUS_OK)) {
retCode_ = static_cast<int32_t>(BQS_STATUS_QUEUE_ID_ERROR);
queueRouteList->status = 1;
queueRouteList = queueRouteList + 1U;
continue;
}
EntityInfo srcEntity = CreateBasicEntityInfo(queueRouteList->srcId,
static_cast<dgw::EntityType>(queueRouteList->srcType));
EntityInfo dstEntity = CreateBasicEntityInfo(queueRouteList->dstId,
static_cast<dgw::EntityType>(queueRouteList->dstType));
const auto result = relationInstance.UnBind(srcEntity, dstEntity, index);
if (result == BQS_STATUS_RETRY) {
queueRouteList = queueRouteList + 1U;
continue;
} else if (result != BQS_STATUS_OK) {
retCode_ = static_cast<int32_t>(result);
queueRouteList->status = 1;
} else {
queueRouteList->status = 0;
}
queueRouteList = queueRouteList + 1U;
BQS_LOG_RUN_INFO("Bind relation [del], stage [server:process], relation [src %s," \
"dst %s, result:%d]",
srcEntity.ToString().c_str(), dstEntity.ToString().c_str(), static_cast<int32_t>(result));
}
relationInstance.Order(index);
return;
}
* Bqs server enqueue bind msg request process
* @return NA
*/
void RouterServer::BindMsgProc(const uint32_t index)
{
BQS_LOG_INFO("[RouterServer]RouterServer BindMsgProc begin.");
auto &processing = (index == 0U) ? processing_ : processingExtra_;
auto &done = (index == 0U) ? done_ : doneExtra_;
const std::unique_lock<std::mutex> lk(mutex_);
processing = true;
if ((subEventId_ == static_cast<uint32_t>(AICPU_BIND_QUEUE)) ||
(subEventId_ == static_cast<uint32_t>(ACL_BIND_QUEUE))) {
ProcessBindQueue(index);
} else if ((subEventId_ == static_cast<uint32_t>(AICPU_UNBIND_QUEUE)) ||
(subEventId_ == static_cast<uint32_t>(ACL_UNBIND_QUEUE))) {
ProcessUnbindQueue(index);
} else if ((subEventId_ == static_cast<uint32_t>(QueueSubEventType::UPDATE_CONFIG))) {
retCode_ = (cfgInfoOperator_ == nullptr) ? static_cast<int32_t>(BQS_STATUS_INNER_ERROR) :
static_cast<int32_t>(cfgInfoOperator_->ProcessUpdateConfig(index));
BQS_LOG_INFO("[RouterServer] Process update config ret is %d.", retCode_);
} else {
BQS_LOG_ERROR("[RouterServer]Invalid subEventId_[%d] in bind relation process.", subEventId_);
}
processing = false;
done = true;
cv_.notify_one();
BQS_LOG_INFO("[RouterServer]RouterServer BindMsgProc end.");
return;
}
* Init bqs server, including init easycomm server and bind relation
* @return BQS_STATUS_OK:success other:failed
*/
BqsStatus RouterServer::InitRouterServer(const InitQsParams ¶ms)
{
BQS_LOG_INFO("[RouterServer]RouterServer Init begin");
(void)signal(SIGPIPE, SIG_IGN);
qsInitGroupName_ = params.qsInitGrpName;
f2nfGroupId_ = params.f2nfGroupId;
schedPolicy_ = params.schedPolicy;
cfgInfoOperator_.reset(new (std::nothrow) ConfigInfoOperator(params.deviceId, qsInitGroupName_));
if (cfgInfoOperator_ == nullptr) {
BQS_LOG_ERROR("malloc memory for cfgInfoOperator_ failed.");
return BQS_STATUS_INNER_ERROR;
}
SubscribeBufEvent();
if (!running_) {
running_ = true;
deviceId_ = params.deviceId;
deployMode_ = params.runMode;
numaFlag_ = params.numaFlag;
needAttachGroup_ = params.needAttachGroup;
if ((bqs::GetRunContext() == bqs::RunContext::HOST) && (&halShrIdGetAttribute == nullptr)) {
compatMsg_ = true;
}
BQS_LOG_INFO("compatMsg_ is %d", compatMsg_);
try {
monitorQsEvent_ = std::thread(&RouterServer::ManageQsEvent, this);
} catch(std::exception &threadException) {
BQS_LOG_ERROR("RouterServer Init thread failure, %s", threadException.what());
return BQS_STATUS_INNER_ERROR;
}
std::unique_lock<std::mutex> lk(manageThreadMutex_);
manageThreadCv_.wait(lk, [this] { return manageThreadStatus_ != ThreadStatus::NOT_INIT; });
if (manageThreadStatus_ != ThreadStatus::INIT_SUCCESS) {
BQS_LOG_ERROR("RouterServer thread fail to start");
return BQS_STATUS_INNER_ERROR;
}
BQS_LOG_INFO("[RouterServer]RouterServer Init success.");
} else {
BQS_LOG_WARN("RouterServer is already inited");
}
return BQS_STATUS_OK;
}
BqsStatus RouterServer::ParseRelationInfo(Mbuf **mbufPtr)
{
Mbuf *mBuf = nullptr;
const auto drvRet = halQueueDeQueue(deviceId_, pipelineQueueId_, PtrToPtr<Mbuf *, void*>(&mBuf));
if ((drvRet != DRV_ERROR_NONE) || (mBuf == nullptr)) {
BQS_LOG_ERROR("[RouterServer]halQueueDeQueue from queue[%u] in device[%u] failed, error[%d]",
pipelineQueueId_.load(), deviceId_, static_cast<int32_t>(drvRet));
return BQS_STATUS_DRIVER_ERROR;
}
*mbufPtr = mBuf;
qsRouterHeadPtr_ = nullptr;
const auto getBuffRet = halMbufGetBuffAddr(mBuf, reinterpret_cast<void **>(&qsRouterHeadPtr_));
if ((getBuffRet != static_cast<int32_t>(DRV_ERROR_NONE)) || (qsRouterHeadPtr_ == nullptr)) {
BQS_LOG_ERROR("[RouterServer]halMbufGetBuffAddr from queue[%u] in device[%u] failed, error[%d]",
pipelineQueueId_.load(), deviceId_, getBuffRet);
return BQS_STATUS_DRIVER_ERROR;
}
if (isAicpuEvent_) {
aicpuRspHead_ = qsRouterHeadPtr_->userData;
}
subEventId_ = qsRouterHeadPtr_->subEventId;
BQS_LOG_INFO("[RouterServer]Parse head[%lu] subEvnetId[%u] from mbuff success.", aicpuRspHead_, subEventId_);
if ((subEventId_ == static_cast<uint32_t>(AICPU_QUERY_QUEUE)) ||
(subEventId_ == static_cast<uint32_t>(ACL_QUERY_QUEUE))) {
if ((((qsRouterHeadPtr_->routeNum * sizeof(QueueRoute)) + sizeof(QsRouteHead)) + sizeof(QueueRouteQuery)) !=
qsRouterHeadPtr_->length) {
BQS_LOG_ERROR("[RouterServer]RouteNum[%d] is inconsistence with dataLen[%d] in subEventId[%u]",
qsRouterHeadPtr_->routeNum, qsRouterHeadPtr_->length, subEventId_);
return BQS_STATUS_PARAM_INVALID;
}
qsRouterQueryPtr_ = reinterpret_cast<QueueRouteQuery *>(
reinterpret_cast<uint8_t *>(qsRouterHeadPtr_) + sizeof(QsRouteHead));
BQS_LOG_INFO("[RouterServer]Get query info success. queryType[%d]", qsRouterQueryPtr_->queryType);
qsRouteListPtr_ = reinterpret_cast<QueueRoute *>(
reinterpret_cast<uint8_t *>(qsRouterQueryPtr_) + sizeof(QueueRouteQuery));
} else {
if (((qsRouterHeadPtr_->routeNum * sizeof(QueueRoute)) + sizeof(QsRouteHead)) != qsRouterHeadPtr_->length) {
BQS_LOG_ERROR("[RouterServer]RouteNum[%d] is inconsistence with dataLen[%d] in subEventId[%u]",
qsRouterHeadPtr_->routeNum, qsRouterHeadPtr_->length, subEventId_);
return BQS_STATUS_PARAM_INVALID;
}
qsRouterQueryPtr_ = nullptr;
qsRouteListPtr_ = reinterpret_cast<QueueRoute *>(
reinterpret_cast<uint8_t *>(qsRouterHeadPtr_) + sizeof(QsRouteHead));
}
BQS_LOG_INFO("[RouterServer]Get relation mbuff success, bind/unbind queue num[%d]", qsRouterHeadPtr_->routeNum);
return BQS_STATUS_OK;
}
BqsStatus RouterServer::ParseBindUnbindMsg() const
{
BQS_LOG_INFO("[RouterServer]Bind relation [add/del], stage [server:parse and check]");
auto resultCode = BQS_STATUS_QUEUE_AHTU_ERROR;
QueueRoute *queueRouteList = qsRouteListPtr_;
for (uint32_t i = 0U; i < qsRouterHeadPtr_->routeNum; ++i) {
EntityInfo srcEntity = CreateBasicEntityInfo(queueRouteList->srcId,
static_cast<dgw::EntityType>(queueRouteList->srcType));
EntityInfo dstEntity = CreateBasicEntityInfo(queueRouteList->dstId,
static_cast<dgw::EntityType>(queueRouteList->dstType));
BQS_LOG_INFO("[RouterServer]Src[id:%u type:%d] Dst[id:%u type:%d]",
srcEntity.GetId(), static_cast<int32_t>(srcEntity.GetType()),
dstEntity.GetId(), static_cast<int32_t>(dstEntity.GetType()));
if ((srcEntity.GetId() >= MAX_QUEUE_ID_NUM) || (dstEntity.GetId() >= MAX_QUEUE_ID_NUM)) {
BQS_LOG_ERROR("[RouterServer]Src[%s] or Dst[%s] is invalid in this "
"bind/unbind relation", srcEntity.ToString().c_str(), dstEntity.ToString().c_str());
queueRouteList->status = static_cast<int32_t>(BQS_STATUS_QUEUE_ID_ERROR);
queueRouteList = queueRouteList + 1;
continue;
}
if ((subEventId_ == static_cast<uint32_t>(AICPU_BIND_QUEUE)) ||
(subEventId_ == static_cast<uint32_t>(ACL_BIND_QUEUE))) {
const auto ret = (cfgInfoOperator_ == nullptr) ? BQS_STATUS_INNER_ERROR :
cfgInfoOperator_->AttachAndCheckQueue(srcEntity, dstEntity);
if (ret != BQS_STATUS_OK) {
BQS_LOG_ERROR("[RouterServer]Src[%s] Dst[%s] do attach queue "
"and check auth failed", srcEntity.ToString().c_str(), dstEntity.ToString().c_str());
queueRouteList->status = static_cast<int32_t>(BQS_STATUS_QUEUE_AHTU_ERROR);
queueRouteList = queueRouteList + 1;
continue;
}
}
resultCode = BQS_STATUS_OK;
queueRouteList->status = static_cast<int32_t>(BQS_STATUS_OK);
queueRouteList = queueRouteList + 1;
}
BQS_LOG_INFO("[RouterServer]Finish parse bind/unbind message,resultCode[%d]", static_cast<int32_t>(resultCode));
return resultCode;
}
void RouterServer::FillRoutes(const EntityInfo &src, const EntityInfo &dst, const BindRelationStatus status)
{
QueueRoute queueRouteInfo = {};
queueRouteInfo.srcId = src.GetId();
queueRouteInfo.dstId = dst.GetId();
queueRouteInfo.srcType = static_cast<int16_t>(src.GetType());
queueRouteInfo.dstType = static_cast<int16_t>(dst.GetType());
queueRouteInfo.status = static_cast<int32_t>(status);
queueRouteQueryList_.emplace_back(queueRouteInfo);
}
void RouterServer::SearchRelation(const MapEnitityInfoToInfoSet &relationMap, const EntityInfo& entityInfo,
const BindRelationStatus status, bool bySrc)
{
const auto iter = relationMap.find(entityInfo);
if (iter == relationMap.end()) {
return;
}
const auto dstSet = iter->second;
BQS_LOG_INFO("[RouterServer]Bind relation [get], stage [server:process], relation [size:%zu].", dstSet.size());
if (bySrc) {
for (auto setIter = dstSet.begin(); setIter != dstSet.end(); ++setIter) {
FillRoutes(entityInfo, *setIter, status);
}
return;
}
for (auto setIter = dstSet.begin(); setIter != dstSet.end(); ++setIter) {
FillRoutes(*setIter, entityInfo, status);
}
}
* Assembly response of get bind message according to src entity
* @return Number of query results
*/
void RouterServer::GetBindRspBySingle(const EntityInfo& entityInfo, const uint32_t &queryType)
{
BQS_LOG_INFO("[RouterServer]RouterServer serialize get bind rsponse by entityId[%u], entityType[%d], Type[%d].",
entityInfo.GetId(), static_cast<int32_t>(entityInfo.GetType()), queryType);
auto &relationInstance = BindRelation::GetInstance();
queueRouteQueryList_.clear();
if ((queryType != static_cast<uint32_t>(BQS_QUERY_TYPE_SRC)) &&
(queryType != static_cast<uint32_t>(BQS_QUERY_TYPE_DST))) {
BQS_LOG_ERROR("[RouterServer]QueryType[%d] is not supported.", queryType);
return;
}
if (queryType == static_cast<uint32_t>(BQS_QUERY_TYPE_SRC)) {
SearchRelation(relationInstance.GetSrcToDstRelation(), entityInfo, BindRelationStatus::RelationBind, true);
if (numaFlag_) {
SearchRelation(relationInstance.GetSrcToDstExtraRelation(), entityInfo,
BindRelationStatus::RelationBind, true);
}
SearchRelation(relationInstance.GetAbnormalSrcToDstRelation(), entityInfo,
BindRelationStatus::RelationAbnormalForQError, true);
if (queueRouteQueryList_.empty()) {
BQS_LOG_WARN("[RouterServer] record does not exist according to src entityId:[%u]", entityInfo.GetId());
}
} else {
SearchRelation(relationInstance.GetDstToSrcRelation(), entityInfo, BindRelationStatus::RelationBind, false);
if (numaFlag_) {
SearchRelation(relationInstance.GetDstToSrcExtraRelation(), entityInfo,
BindRelationStatus::RelationBind, false);
}
SearchRelation(relationInstance.GetAbnormalDstToSrcRelation(), entityInfo,
BindRelationStatus::RelationAbnormalForQError, false);
}
if (queueRouteQueryList_.empty()) {
BQS_LOG_WARN("RouterServer get relation according to dst:%u failed, record does not exist", entityInfo.GetId());
}
}
bool RouterServer::FindRelation(const MapEnitityInfoToInfoSet &relationMap, const EntityInfo& srcInfo,
const EntityInfo& dstInfo) const
{
const auto srcIter = relationMap.find(srcInfo);
return ((srcIter != relationMap.end()) && (srcIter->second.count(dstInfo) != 0UL));
}
void RouterServer::TransRouteWithEntityInfo(const EntityInfo& srcInfo, const EntityInfo& dstInfo, const int32_t status,
QueueRoute &routeInfo) const
{
routeInfo.srcId = srcInfo.GetId();
routeInfo.dstId = dstInfo.GetId();
routeInfo.srcType = static_cast<int16_t>(srcInfo.GetType());
routeInfo.dstType = static_cast<int16_t>(dstInfo.GetType());
routeInfo.status = static_cast<int32_t>(status);
}
* Assembly response of get bind message according to dst queueId, one-to-one relation
* @return NA
*/
void RouterServer::GetBindRspByDouble(const EntityInfo& src, const EntityInfo& dst, const uint32_t &queryType)
{
BQS_LOG_INFO("[RouterServer]RouterServer serialize get bind rsponse by srcId[%u], srcType[%d], dstId[%u], "
"dstType[%d], Type[%u]",
src.GetId(), static_cast<int32_t>(src.GetType()), dst.GetId(), static_cast<int32_t>(dst.GetType()), queryType);
queueRouteQueryList_.clear();
if (queryType == static_cast<uint32_t>(BQS_QUERY_TYPE_SRC_OR_DST)) {
GetBindRspBySingle(src, static_cast<uint32_t>(BQS_QUERY_TYPE_SRC));
if (queueRouteQueryList_.size() > 0UL) {
return;
}
GetBindRspBySingle(dst, static_cast<uint32_t>(BQS_QUERY_TYPE_DST));
return;
}
if (queryType == static_cast<uint32_t>(BQS_QUERY_TYPE_SRC_AND_DST)) {
auto status = BindRelationStatus::RelationUnknown;
if (FindRelation(BindRelation::GetInstance().GetSrcToDstRelation(), src, dst) ||
(numaFlag_ && (FindRelation(BindRelation::GetInstance().GetSrcToDstExtraRelation(), src, dst)))) {
status = BindRelationStatus::RelationBind;
} else {
if (FindRelation(BindRelation::GetInstance().GetAbnormalSrcToDstRelation(), src, dst)) {
status = BindRelationStatus::RelationAbnormalForQError;
}
}
if (status != BindRelationStatus::RelationUnknown) {
QueueRoute queueRouteInfo = {};
TransRouteWithEntityInfo(src, dst, static_cast<int32_t>(status), queueRouteInfo);
queueRouteQueryList_.emplace_back(queueRouteInfo);
}
return;
}
BQS_LOG_ERROR("[RouterServer]QueryType[%d] is not supported.", queryType);
return;
}
void RouterServer::GetAllAbnormalBind()
{
BQS_LOG_INFO("[RouterServer]RouterServer serialize get all abnormal bind rsponse");
queueRouteQueryList_.clear();
auto &relationInstance = BindRelation::GetInstance();
auto &abnormalSrcToDstRelation = relationInstance.GetAbnormalSrcToDstRelation();
for (auto iter = abnormalSrcToDstRelation.begin(); iter != abnormalSrcToDstRelation.end(); ++iter) {
const auto &src = iter->first;
const auto &dstSet = iter->second;
for (auto &dst : dstSet) {
QueueRoute queueRouteInfo = {};
TransRouteWithEntityInfo(src, dst, static_cast<int32_t>(BindRelationStatus::RelationAbnormalForQError),
queueRouteInfo);
queueRouteQueryList_.emplace_back(queueRouteInfo);
}
}
}
BqsStatus RouterServer::ProcessGetBindMsg(const uint32_t &queryType, const EntityInfo& src, const EntityInfo& dst)
{
switch (queryType) {
case BQS_QUERY_TYPE_SRC:
GetBindRspBySingle(src, queryType);
break;
case BQS_QUERY_TYPE_DST:
GetBindRspBySingle(dst, queryType);
break;
case BQS_QUERY_TYPE_SRC_OR_DST:
case BQS_QUERY_TYPE_SRC_AND_DST:
GetBindRspByDouble(src, dst, queryType);
break;
case BQS_QUERY_TYPE_ABNORMAL_FOR_QUEUE_ERROR:
GetAllAbnormalBind();
break;
default:
BQS_LOG_ERROR("[RouterServer]Unsupported query type"
"{0:src, 1:dst, 2:src-or-dst, 3:src-and-dst, 100:abnormal-all}:%u", queryType);
break;
}
if ((subEventId_ == static_cast<uint32_t>(AICPU_QUERY_QUEUE_NUM)) ||
(subEventId_ == static_cast<uint32_t>(ACL_QUERY_QUEUE_NUM))) {
return BQS_STATUS_OK;
}
if (queueRouteQueryList_.size() != qsRouterHeadPtr_->routeNum) {
BQS_LOG_ERROR("[RouterServer]Prepare number[%d] is different with real route number[%zu].",
qsRouterHeadPtr_->routeNum, queueRouteQueryList_.size());
return BQS_STATUS_PARAM_INVALID;
}
for (size_t i = 0UL; i < qsRouterHeadPtr_->routeNum; i++) {
*qsRouteListPtr_ = queueRouteQueryList_[i];
BQS_LOG_INFO("[RouterServer]Query bind relation srcId[%u] srcType[%d] dstId[%u] dstType[%d]",
qsRouteListPtr_->srcId, static_cast<int32_t>(qsRouteListPtr_->srcType), qsRouteListPtr_->dstId,
static_cast<int32_t>(qsRouteListPtr_->dstType));
qsRouteListPtr_ = qsRouteListPtr_ + 1;
}
queueRouteQueryList_.clear();
return BQS_STATUS_OK;
}
void RouterServer::ParseGetBindNumMsg(const event_info &info)
{
BQS_LOG_INFO("[RouterServer]Bind relation number [get], stage [server:process], type [request]");
if (info.priv.msg_len != sizeof(QueueRouteQuery)) {
BQS_LOG_ERROR("[RouterServer]Query event[%u] message invalid, msgLen[%u]", subEventId_, info.priv.msg_len);
SendRspEvent(static_cast<int32_t>(BQS_STATUS_PARAM_INVALID));
return;
}
const QueueRouteQuery * const queueRouteQuery = PtrToPtr<const char_t, const QueueRouteQuery>(info.priv.msg);
aicpuRspHead_ = isAicpuEvent_ ? queueRouteQuery->syncEventHead : 0UL;
const uint32_t keyType = queueRouteQuery->queryType;
const EntityInfo src = CreateBasicEntityInfo(queueRouteQuery->srcId,
static_cast<dgw::EntityType>(queueRouteQuery->srcType));
const EntityInfo dst = CreateBasicEntityInfo(queueRouteQuery->dstId,
static_cast<dgw::EntityType>(queueRouteQuery->dstType));
queueRouteQueryList_.clear();
const auto ret = ProcessGetBindMsg(keyType, src, dst);
SendRspEvent(static_cast<int32_t>(ret));
return;
}
BqsStatus RouterServer::ParseGetBindDetailMsg()
{
BQS_LOG_INFO("[RouterServer]Bind relation [get], stage [server:process], type [request]");
if (qsRouterQueryPtr_ == nullptr) {
BQS_LOG_ERROR("[RouterServer]qsRouterQuery should not be null pointer");
return BQS_STATUS_INNER_ERROR;
}
const uint32_t keyType = qsRouterQueryPtr_->queryType;
const EntityInfo src = CreateBasicEntityInfo(qsRouterQueryPtr_->srcId,
static_cast<dgw::EntityType>(qsRouterQueryPtr_->srcType));
const EntityInfo dst = CreateBasicEntityInfo(qsRouterQueryPtr_->dstId,
static_cast<dgw::EntityType>(qsRouterQueryPtr_->dstType));
queueRouteQueryList_.clear();
const auto ret = ProcessGetBindMsg(keyType, src, dst);
return ret;
}
ThreadStatus RouterServer::PrePareForManageThread()
{
auto ret = halEschedAttachDevice(deviceId_);
if ((ret != DRV_ERROR_NONE) && (ret != DRV_ERROR_PROCESS_REPEAT_ADD)) {
BQS_LOG_ERROR("Failed to attach device[%u] for eSched, result[%d].", deviceId_, static_cast<int32_t>(ret));
(void) AttachGroup();
return ThreadStatus::INIT_FAIL;
}
ret = halEschedCreateGrp(deviceId_, bindQueueGroupId_, GRP_TYPE_BIND_CP_CPU);
if (ret != DRV_ERROR_NONE) {
(void)halEschedDettachDevice(deviceId_);
BQS_LOG_ERROR("Failed to create bindQueueGroup, groupId[%u] result[%d].",
bindQueueGroupId_, static_cast<int32_t>(ret));
(void) AttachGroup();
return ThreadStatus::INIT_FAIL;
}
const uint64_t eventBitmap = (1UL << static_cast<uint32_t>(EVENT_QS_MSG));
BQS_LOG_INFO("[RouterServer]BindQueue group[%u] subscribe event, eventBitmap[%lu]",
bindQueueGroupId_, eventBitmap);
ret = halEschedSubscribeEvent(deviceId_, bindQueueGroupId_, 0U, eventBitmap);
if (ret != DRV_ERROR_NONE) {
BQS_LOG_ERROR("[RouterServer]halEschedSubscribeEvent failed, groupId[%u] eventBitmap[%lu] result[%d].",
bindQueueGroupId_, eventBitmap, static_cast<int32_t>(ret));
(void) AttachGroup();
return ThreadStatus::INIT_FAIL;
}
if (!AttachGroup()) {
BQS_LOG_ERROR("[RouterServer] Fail to attach group");
return ThreadStatus::INIT_FAIL;
}
BQS_LOG_RUN_INFO("[RouterServer] ManageQsEvent of RouterServer is ready.");
return ThreadStatus::INIT_SUCCESS;
}
void RouterServer::ManageQsEvent()
{
BQS_LOG_INFO("[RouterServer] Manage QS event of router server thread start");
(void)pthread_setname_np(pthread_self(), ROUTER_SERVER_THREAD_NAME_PREFIX);
if (bqs::GetRunContext() != bqs::RunContext::HOST) {
const std::vector<uint32_t> &cpuIds = QueueScheduleInterface::GetInstance().GetCtrlCpuIds();
const pthread_t threadId = pthread_self();
(void)BindCpuUtils::SetThreadAffinity(threadId, cpuIds);
}
{
std::unique_lock<std::mutex> lk(manageThreadMutex_);
manageThreadStatus_ = PrePareForManageThread();
manageThreadCv_.notify_all();
if (manageThreadStatus_ != ThreadStatus::INIT_SUCCESS) {
return;
}
}
struct event_info event = {};
constexpr int32_t waitTimeout = 2000;
while (running_) {
const auto schedRet = halEschedWaitEvent(deviceId_, bindQueueGroupId_, 0U, waitTimeout, &event);
if (schedRet == DRV_ERROR_NONE) {
if (event.comm.event_id == EVENT_QS_MSG) {
HandleBqsMsg(event);
} else {
BQS_LOG_WARN("Thread[%u] process unsupported eventId[%d] subEventId[%u].",
0, static_cast<int32_t>(event.comm.event_id), event.comm.subevent_id);
}
} else if (schedRet == DRV_ERROR_SCHED_WAIT_TIMEOUT) {
BQS_LOG_DEBUG("ManageQsEvent bind/unbind/query event waiting timeout");
continue;
} else if (schedRet == DRV_ERROR_PARA_ERROR) {
BQS_LOG_ERROR(
"ManageQsEvent bind/unbind/query event failed, deviceId[%u] groupId[%u] error[%d].",
deviceId_, bindQueueGroupId_, static_cast<int32_t>(schedRet));
break;
} else {
BQS_LOG_ERROR(
"ManageQsEvent bind/unbind/query event failed, deviceId[%u] groupId[%u] error[%d].",
deviceId_, bindQueueGroupId_, static_cast<int32_t>(schedRet));
}
}
BQS_LOG_INFO("[RouterServer] ManageQsEvent of RouterServer thread exit.");
}
BqsStatus RouterServer::SubscribeBufEvent() const
{
BQS_LOG_INFO("[RouterServer] SubscribeBufEvent start.");
const bool needSubBufEvent =
static_cast<bool>(schedPolicy_ & static_cast<uint64_t>(SchedPolicy::POLICY_SUB_BUF_EVENT));
if (!needSubBufEvent) {
BQS_LOG_INFO("[RouterServer] needSubBufEvent is [%d]", static_cast<int32_t>(needSubBufEvent));
return BQS_STATUS_OK;
}
dgw::HcclSoManager::GetInstance()->LoadSo();
return BQS_STATUS_OK;
}
BqsStatus RouterServer::WaitSyncMsgProc()
{
std::unique_lock<std::mutex> lk(mutex_);
done_ = false;
auto ret = QueueManager::GetInstance().EnqueueRelationEvent();
if (ret != BQS_STATUS_OK) {
return ret;
}
if (numaFlag_) {
doneExtra_ = false;
auto result = QueueManager::GetInstance().EnqueueRelationEventExtra();
if (result != BQS_STATUS_OK) {
return result;
}
}
BQS_LOG_INFO("[RouterServer] update config[add/del group, bind/unbind route], stage [server:waiting]");
(void)cv_.wait_for(lk, std::chrono::milliseconds(MAX_WAITING_NOTIFY), [this] { return done_ && doneExtra_; });
while ((!done_ || !doneExtra_) && (processing_ || processingExtra_)) {
cv_.wait(lk);
}
if (!done_ || !doneExtra_) {
QueueManager::GetInstance().LogErrorRelationQueueStatus();
BQS_LOG_ERROR("[RouterServer] update config[add/del group, bind/unbind route], stage [server:wait], timeout, "
"relation queue[enqueue cnt:%lu, dequeue cnt:%lu].",
StatisticManager::GetInstance().GetRelationEnqueCnt(),
StatisticManager::GetInstance().GetRelationDequeCnt());
return BQS_STATUS_TIMEOUT;
}
ret = static_cast<BqsStatus>(retCode_);
retCode_ = static_cast<int32_t>(BQS_STATUS_OK);
return ret;
}
void RouterServer::NotifyInitSuccess()
{
BQS_LOG_RUN_INFO("schedule finish initing, now router is ready to handle msg");
readyToHandleMsg_ = true;
}
bool RouterServer::AttachGroup()
{
if (!needAttachGroup_) {
return true;
}
BQS_LOG_INFO("Begin to attach group");
std::stringstream grpNameStream(qsInitGroupName_);
std::string grpNameElement;
std::vector<std::string> groupNameVec;
while (getline(grpNameStream, grpNameElement, ',')) {
groupNameVec.emplace_back(grpNameElement);
}
const int32_t halTimeOut = (RunContext::HOST == GetRunContext()) ? 3000 : -1;
for (const auto &grpName : groupNameVec) {
BQS_LOG_RUN_INFO("Begin to halGrpAttach group[%s].", grpName.c_str());
const auto drvRet = halGrpAttach(grpName.c_str(), halTimeOut);
if (drvRet != static_cast<int32_t>(DRV_ERROR_NONE)) {
BQS_LOG_ERROR("halGrpAttach group[%s] failed. ret[%d]", grpName.c_str(), drvRet);
return false;
}
BQS_LOG_RUN_INFO("halGrpAttach group[%s] success.", grpName.c_str());
}
return true;
}
EntityInfo RouterServer::CreateBasicEntityInfo(const uint32_t id, const dgw::EntityType eType) const
{
OptionalArg args = {};
args.eType = eType;
return EntityInfo(id, deviceId_, &args);
}
}