* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "queue_schedule.h"
#include <unistd.h>
#include <sched.h>
#include <csignal>
#include <cerrno>
#include <string>
#include "driver/ascend_hal.h"
#include "server/bqs_server.h"
#include "server/router_server.h"
#include "bind_relation.h"
#include "bind_cpu_utils.h"
#include "statistic_manager.h"
#include "subscribe_manager.h"
#include "queue_manager.h"
#include "profile_manager.h"
#include "entity_manager.h"
#include "hccl_process.h"
#include "common/bqs_util.h"
#include "queue_schedule_sub_module_interface.h"
#include "qs_interface_process.h"
#include "queue_schedule_hal_interface_ref.h"
#include "tsd.h"
#include "schedule_config.h"
#include "dynamic_sched_mgr.hpp"
#include "fsm/state_define.h"
#include "common/bqs_feature_ctrl.h"
#include "qs_args_parser.h"
#include "queue_schedule_feature_ctrl.h"
namespace bqs {
namespace {
constexpr uint32_t HOST_NAME_MAX_LEN = 128U;
constexpr const char_t *AOS_SD = "AOS_SD";
constexpr const size_t FIRST_ARRAY_INDEX = 0LU;
constexpr const uint32_t DAEMON_WAIT_TIMEOUT = 30U;
constexpr const uint32_t HOST_ENQUEUE_THREAD_NUM = 10U;
constexpr const uint32_t HOST_F2NF_THREAD_NUM = 10U;
constexpr const uint32_t MAX_QOS_NUM_FOR_HCCL_EVENT = 12U;
constexpr const uint32_t MAX_QOS_NUM_FOR_F2NF_EVENT = 1U;
constexpr const char_t *ENQUEUE_THREAD_NAME_PREFIX = "enqueue_";
constexpr const char_t *F2NF_THREAD_NAME_PREFIX = "f2nf_";
constexpr const char_t *DAEMON_THREAD_NAME_PREFIX = "daemon";
constexpr const uint32_t ERROR_LOG_SAMPLE_INTERVAL = 1000U;
void DynamicScheduleByResponse(const uint32_t key, const uint32_t index,
const std::vector<dgw::DynamicSchedMgr::ResponseInfo> &responses)
{
BQS_LOG_INFO("responses size is %zu", responses.size());
for (const auto &response : responses) {
dgw::EntityPtr dynamicSrcEntity =
dgw::EntityManager::Instance(index).GetSrcEntityByGlobalId(key, response.src.queueLogicId);
if (dynamicSrcEntity == nullptr) {
BQS_LOG_ERROR("Can't get entity by key[%u], globalId[%u]", key, response.src.queueLogicId);
continue;
}
uint32_t updateCount = 0U;
const auto &dataResults = response.groupResults;
for (const auto groupResult : dataResults) {
dgw::EntityPtr dynamicDstGrpEnity =
dgw::EntityManager::Instance(index).GetDstEntityByGlobalId(key, groupResult.logicGroupId);
if (dynamicDstGrpEnity == nullptr) {
BQS_LOG_ERROR("Can't get entity by key[%u], globalId[%u]", key, groupResult.logicGroupId);
continue;
}
const std::vector<dgw::EntityPtr> &entitiesInGroup =
dgw::EntityManager::Instance(index).GetEntitiesInGroup(dynamicDstGrpEnity->GetId());
if (entitiesInGroup.size() <= groupResult.index) {
BQS_LOG_ERROR("Dynamic response's index[%u] is larger than group size[%zu]",
groupResult.index, entitiesInGroup.size());
continue;
}
const dgw::EntityPtr dynamicDstInGroup = entitiesInGroup[groupResult.index];
if (dynamicSrcEntity->UpdateSendObject(dynamicDstGrpEnity, dynamicDstInGroup)) {
++updateCount;
}
}
BQS_LOG_INFO("updateCount is %u", updateCount);
if (updateCount > 0U) {
BQS_LOG_INFO("%s processMessage", dynamicSrcEntity->ToString().c_str());
dgw::DynamicSchedMgr::GetInstance(index).DynamicSchedDurationEnd(dynamicSrcEntity->GetDynamicReqTime());
dgw::InnerMessage msg;
msg.msgType = dgw::InnerMsgType::INNER_MSG_PUSH;
(void) dynamicSrcEntity->ProcessMessage(msg);
}
}
}
thread_local static int32_t thread_groupId;
thread_local static int32_t thread_deviceId;
int32_t getGrpId(int32_t tag, int32_t *grpId, int32_t *deviceId)
{
(void) tag;
if ((grpId == nullptr) || (deviceId == nullptr)) {
return -1;
}
*grpId = thread_groupId;
*deviceId = thread_deviceId;
return 0;
}
}
BqsStatus QueueSchedule::StartQueueSchedule()
{
auto ret = BqsServer::GetInstance().InitBqsServer(qsInitGroupName_, deviceId_);
if (ret != BQS_STATUS_OK) {
BQS_LOG_ERROR("BqsServer Init failed, ret=%d.", ret);
return ret;
}
ret = RouterServer::GetInstance().InitRouterServer(initQsParams_);
if (ret != BQS_STATUS_OK) {
BQS_LOG_ERROR("RouterServer Init failed, ret=%d.", ret);
return ret;
}
GlobalCfg::GetInstance().SetNumaFlag(initQsParams_.numaFlag);
GlobalCfg::GetInstance().RecordDeviceId(deviceId_, 0U, enqueGroupId_);
reschedInterval_ = (reschedInterval_ == 0U) ? DAEMON_WAIT_TIMEOUT : reschedInterval_;
abnormalInterval_ = ((initQsParams_.abnormalInterVal > ABNORMAL_INTERVAL_MIN) &&
(initQsParams_.abnormalInterVal < ABNORMAL_INTERVAL_MAX)) ?
initQsParams_.abnormalInterVal : ABNORMAL_INTERVAL_DEFAULT;
StatisticManager::GetInstance().StartStatisticManager(
abnormalInterval_, initQsParams_.pid, initQsParams_.numaFlag, initQsParams_.deviceIdExtra,
initQsParams_.enqueGroupIdExtra);
char_t hostNameStr[HOST_NAME_MAX_LEN] = {};
const int32_t getNameRet = gethostname(&hostNameStr[FIRST_ARRAY_INDEX], HOST_NAME_MAX_LEN);
if (getNameRet < 0) {
BQS_LOG_ERROR("gethostname failed, ret=%d.", getNameRet);
return BQS_STATUS_INNER_ERROR;
}
const std::string nameStr(hostNameStr);
uint32_t threadNum = 0U;
if (nameStr == AOS_SD) {
hasAICPU_ = false;
threadNum = 1U;
}
BQS_LOG_INFO("StartQueueSchedule schedPolicy:[%lu].", initQsParams_.schedPolicy);
if (hasAICPU_ && (bqs::RunContext::HOST != bqs::GetRunContext())) {
uint32_t aicpuNum = QueueScheduleInterface::GetInstance().GetAiCpuNum();
BQS_LOG_RUN_INFO("the number of AICPU cores: %d", aicpuNum);
if (aicpuNum == 0U) {
isZeroSizeAicpuNum_ = true;
threadNum = 1U;
} else {
threadNum = aicpuNum;
}
}
BQS_LOG_RUN_INFO("Has aicpu:%d, numaFlag:%d, deviceId_:%u.",
static_cast<int32_t>(hasAICPU_), initQsParams_.numaFlag, deviceId_);
aicpuFeatureDisableRecvRequestEvent_ = (bqs::GetRunContext() == bqs::RunContext::HOST) ? false : QSFeatureCtrl::ShouldDisableRecvRequestEvent(deviceId_);
aicpuFeatureSetPidPriority_ = (bqs::GetRunContext() == bqs::RunContext::HOST) ? false : QSFeatureCtrl::ShouldSetPidPriority(deviceId_);
ret = InitDrvSchedModule(deviceId_, enqueGroupId_, f2nfGroupId_);
if (ret != BQS_STATUS_OK) {
BQS_LOG_ERROR("InitDrvSchedModule failed, ret=%d.", ret);
return ret;
}
ret = QueueManager::GetInstance().InitQueueManager(deviceId_, enqueGroupId_, hasAICPU_, qsInitGroupName_);
if (ret != BQS_STATUS_OK) {
BQS_LOG_ERROR("QueueManager Init failed, ret=%d.", ret);
return ret;
}
std::set<uint32_t> resDevids(initQsParams_.devIdVec.begin(), initQsParams_.devIdVec.end());
resDevids.insert(deviceId_);
if (initQsParams_.numaFlag) {
resDevids.insert(initQsParams_.deviceIdExtra);
}
Subscribers::GetInstance().InitSubscribeManagers(resDevids, deviceId_);
ProfileManager::GetInstance(0U).InitProfileManager(deviceId_);
dgw::EntityManager::Instance(0U).SetSubscriptionPausePolicy(
(initQsParams_.schedPolicy & static_cast<uint64_t>(SchedPolicy::POLICY_UNSUB_F2NF)) == 0UL);
running_ = true;
if (BindCpuUtils::InitSem() != BQS_STATUS_OK) {
BQS_LOG_ERROR("InitSem failed");
return BQS_STATUS_INNER_ERROR;
}
auto threadRet = StartThreadGroup(threadNum, deviceId_, enqueGroupId_, 0U);
if (threadRet != BQS_STATUS_OK) {
BQS_LOG_ERROR("StartThreadGroup failed");
BindCpuUtils::DestroySem();
return threadRet;
}
if (initQsParams_.numaFlag) {
const auto extraRet = InitExtraSchedule(resDevids, threadNum);
if (extraRet != BQS_STATUS_OK) {
BQS_LOG_ERROR("InitExtraSchedule failed, ret is %d.", static_cast<int32_t>(extraRet));
BindCpuUtils::DestroySem();
return extraRet;
}
}
BindCpuUtils::DestroySem();
RouterServer::GetInstance().NotifyInitSuccess();
return BQS_STATUS_OK;
}
BqsStatus QueueSchedule::InitExtraSchedule(const std::set<uint32_t> &resDevids, uint32_t threadNum)
{
BqsStatus ret = InitDrvSchedModule(initQsParams_.deviceIdExtra, initQsParams_.enqueGroupIdExtra,
initQsParams_.f2nfGroupIdExtra);
if (ret != BQS_STATUS_OK) {
BQS_LOG_ERROR("InitDrvSchedModule failed, ret=%d.", static_cast<int32_t>(ret));
return ret;
}
GlobalCfg::GetInstance().RecordDeviceId(initQsParams_.deviceIdExtra, 1U, initQsParams_.enqueGroupIdExtra);
QueueManager::GetInstance().InitExtra(initQsParams_.deviceIdExtra, initQsParams_.enqueGroupIdExtra);
Subscribers::GetInstance().InitSubscribeManagers(resDevids, initQsParams_.deviceIdExtra);
ProfileManager::GetInstance(1U).InitProfileManager(initQsParams_.deviceIdExtra);
dgw::EntityManager::Instance(1U).SetSubscriptionPausePolicy(
(initQsParams_.schedPolicy & static_cast<uint64_t>(SchedPolicy::POLICY_UNSUB_F2NF)) == 0UL);
if (hasAICPU_ && (bqs::RunContext::HOST != bqs::GetRunContext())) {
uint32_t aicpuNum = QueueScheduleInterface::GetInstance().GetExtraAiCpuNum();
BQS_LOG_RUN_INFO("the number of Extra AICPU cores: %d", aicpuNum);
if (aicpuNum == 0U) {
aicpuNum = 1U;
}
threadNum = aicpuNum;
}
BqsStatus threadRet = StartThreadGroup(threadNum, initQsParams_.deviceIdExtra, initQsParams_.enqueGroupIdExtra, 1U);
if (threadRet != BQS_STATUS_OK) {
BQS_LOG_ERROR("StartThreadGroup failed");
return threadRet;
}
const auto setCallbackRes = HcclSetGrpIdCallback(getGrpId);
if (setCallbackRes != HCCL_SUCCESS) {
BQS_LOG_ERROR("SetCallback failed, res is %d", static_cast<int32_t>(setCallbackRes));
return BQS_STATUS_HCCL_ERROR;
}
return BQS_STATUS_OK;
}
BqsStatus QueueSchedule::StartThreadGroup(const uint32_t threadNum, const uint32_t deviceId,
const uint32_t enqueGroupId, const uint32_t index)
{
const sighandler_t oldHandler = signal(SIGCHLD, static_cast<sighandler_t>(SIG_DFL));
const uint32_t enqueueThreadNum = (bqs::RunContext::HOST == bqs::GetRunContext()) ? HOST_ENQUEUE_THREAD_NUM
: threadNum;
uint32_t vDevNum = 0U;
if ((FeatureCtrl::IsVfModeCheckedByDeviceId(deviceId)) && (&halGetVdevNum != nullptr)) {
int32_t ret = halGetVdevNum(&vDevNum);
if (ret != 0) {
BQS_LOG_ERROR("halGetVdevNum, failed result[%d]", ret);
return BQS_STATUS_DRIVER_ERROR;
}
}
BQS_LOG_INFO("Get vdev num=[%u] success.", vDevNum);
for (uint32_t thIndex = 0U; thIndex < enqueueThreadNum; ++thIndex) {
uint32_t aicpuIndex = 0U;
if (bqs::RunContext::HOST != bqs::GetRunContext()) {
if (vDevNum > 0U) {
aicpuIndex = (index == 0U) ?
QueueScheduleInterface::GetInstance().GetAicpuPhysIndexInVfMode(thIndex, deviceId) :
QueueScheduleInterface::GetInstance().GetExtraAicpuPhysIndexInVfMode(thIndex, deviceId);
} else {
aicpuIndex = (index == 0U) ?
QueueScheduleInterface::GetInstance().GetAicpuPhysIndex(deviceId, thIndex) :
QueueScheduleInterface::GetInstance().GetExtraAicpuPhysIndex(deviceId, thIndex);
}
}
(void) workThreads_.emplace_back(
&QueueSchedule::EnqueueThreadTask, this, deviceId, thIndex, aicpuIndex, enqueGroupId, index);
}
if (bqs::RunContext::HOST != bqs::GetRunContext()) {
(void)workThreads_.emplace_back(&QueueSchedule::DaemonThreadTask, this, index);
}
uint32_t f2nfThreadNum = (bqs::RunContext::HOST == bqs::GetRunContext()) ? HOST_F2NF_THREAD_NUM : threadNum;
if (initQsParams_.numaFlag) {
f2nfThreadNum = 0U;
}
for (uint32_t thIndex = 0U; thIndex < f2nfThreadNum; ++thIndex) {
uint32_t aicpuIndex = 0U;
if (bqs::RunContext::HOST != bqs::GetRunContext()) {
if (vDevNum > 0U) {
aicpuIndex = QueueScheduleInterface::GetInstance().GetAicpuPhysIndexInVfMode(thIndex, deviceId);
} else {
aicpuIndex = QueueScheduleInterface::GetInstance().GetAicpuPhysIndex(deviceId_, thIndex);
}
}
(void) workThreads_.emplace_back(
&QueueSchedule::F2NFThreadTask, this, thIndex, aicpuIndex, f2nfGroupId_);
}
for (uint32_t thIndex = 0U; thIndex < enqueueThreadNum + f2nfThreadNum; ++thIndex) {
if (BindCpuUtils::WaitSem() != BQS_STATUS_OK) {
BQS_LOG_ERROR("WaitSem failed");
return BQS_STATUS_INNER_ERROR;
}
}
(void)signal(SIGCHLD, oldHandler);
return BQS_STATUS_OK;
}
void QueueSchedule::StopQueueSchedule()
{
running_ = false;
const std::unique_lock<std::mutex> daemonWaitLock(daemonWaitMtx_);
daemonWait_.notify_all();
StatisticManager::GetInstance().DumpOutProcMemStatInfo();
StatisticManager::GetInstance().StopStatisticManager();
ProfileManager::GetInstance(0U).Uninit();
if (initQsParams_.numaFlag) {
ProfileManager::GetInstance(1U).Uninit();
}
}
void QueueSchedule::Destroy() const
{
QueueManager::GetInstance().Destroy();
if (!SubModuleInterface::GetInstance().GetStartFlag()) {
(void)halEschedDettachDevice(deviceId_);
if (initQsParams_.numaFlag) {
(void)halEschedDettachDevice(initQsParams_.deviceIdExtra);
}
} else {
BQS_LOG_RUN_INFO("sub module no need process detach main module do it");
}
}
void QueueSchedule::EnqueueThreadTask(const uint32_t deviceId, const uint32_t threadIndex, const uint32_t bindCpuIndex,
const uint32_t groupId, const uint32_t index)
{
BQS_LOG_INFO("QueueSchedule enqueue thread[%u] start.", threadIndex);
BindAicpu(threadIndex, bindCpuIndex);
const auto threadName = std::string(ENQUEUE_THREAD_NAME_PREFIX).append(std::to_string(threadIndex));
(void)pthread_setname_np(pthread_self(), threadName.c_str());
thread_groupId = groupId;
thread_deviceId = deviceId;
const uint64_t eventBitmap =
static_cast<uint64_t>(1LU << static_cast<uint64_t>(EVENT_QUEUE_ENQUEUE)) |
static_cast<uint64_t>(1LU << static_cast<uint64_t>(EVENT_QUEUE_FULL_TO_NOT_FULL)) |
static_cast<uint64_t>(1LU << static_cast<uint64_t>(dgw::EVENT_RECV_REQUEST_MSG)) |
static_cast<uint64_t>(1LU << static_cast<uint64_t>(dgw::EVENT_SEND_COMPLETION_MSG)) |
static_cast<uint64_t>(1LU << static_cast<uint64_t>(dgw::EVENT_RECV_COMPLETION_MSG)) |
static_cast<uint64_t>(1LU << static_cast<uint64_t>(dgw::EVENT_CONGESTION_RELIEF_MSG));
BQS_LOG_INFO("Enque group[%u] subscribe event, eventBitmap[%lu] deviceId[%u]", groupId, eventBitmap, deviceId);
const int32_t ret = halEschedSubscribeEvent(deviceId, groupId, threadIndex, eventBitmap);
if (ret != DRV_ERROR_NONE) {
BQS_LOG_ERROR("halEschedSubscribeEvent failed, groupId[%u] eventBitmap[%lu] result[%d].",
groupId, eventBitmap, ret);
StopQueueSchedule();
return;
}
event_sched_grp_qos qos = {};
const std::vector<uint32_t> eventList = {dgw::EVENT_RECV_REQUEST_MSG, dgw::EVENT_SEND_COMPLETION_MSG,
dgw::EVENT_RECV_COMPLETION_MSG, EVENT_QUEUE_FULL_TO_NOT_FULL};
for (const uint32_t eventId : eventList) {
qos.maxNum = (eventId == static_cast<uint32_t>(EVENT_QUEUE_FULL_TO_NOT_FULL)) ?
MAX_QOS_NUM_FOR_F2NF_EVENT : MAX_QOS_NUM_FOR_HCCL_EVENT;
const auto drvRet = halEschedSetGrpEventQos(deviceId, groupId, static_cast<EVENT_ID>(eventId), &qos);
if (drvRet != DRV_ERROR_NONE) {
BQS_LOG_ERROR("Failed to call halEschedSetGrpEventQos, groupId[%u], qos.maxNum[%u], ret[%d].",
groupId, qos.maxNum, static_cast<int32_t>(drvRet));
StopQueueSchedule();
return;
}
}
LoopProcessEnqueueEvent(threadIndex, deviceId, groupId, index);
}
void QueueSchedule::BindAicpu(const uint32_t threadIndex, const uint32_t bindCpuIndex)
{
if (bqs::RunContext::HOST != bqs::GetRunContext()) {
if (!hasAICPU_) {
struct sched_param param;
param.sched_priority = sched_get_priority_max(SCHED_FIFO);
if (sched_setscheduler(0, SCHED_FIFO, ¶m) == -1) {
BQS_LOG_ERROR("QueueSchedule sched_setscheduler failed, errno:%d, thread exit.", errno);
StopQueueSchedule();
}
} else {
if (!isZeroSizeAicpuNum_) {
const int32_t status = BindCpuUtils::BindAicpu(bindCpuIndex);
if (status != BQS_STATUS_OK) {
BQS_LOG_ERROR(
"QueueSchedule enqueue thread[%u] bind cpu[%u] failed, thread exit.",
threadIndex, bindCpuIndex);
StopQueueSchedule();
}
}
}
}
if (BindCpuUtils::PostSem() != BQS_STATUS_OK) {
BQS_LOG_ERROR("WaitPost failed");
}
}
void QueueSchedule::CheckIfRecover(uint32_t &errCount, const char_t * const identity, const uint32_t threadIndex,
const uint32_t groupId) const
{
if (errCount != 0U) {
errCount = 0U;
BQS_LOG_ERROR("halEschedWaitEvent %s event recover, threadIndex[%u] groupId[%u]",
identity, threadIndex, groupId);
}
}
void QueueSchedule::LoopProcessEnqueueEvent(const uint32_t threadIndex, const uint32_t deviceId,
const uint32_t groupId, const uint32_t index)
{
QueueManager::GetInstance().NotifyInitSuccess(index);
struct event_info event = {};
constexpr int32_t waitTimeout = 2000;
uint32_t errCount = 0U;
while (running_) {
StatisticManager::GetInstance().RefreshEnqueHeartBeat();
const int32_t schedRet = halEschedWaitEvent(deviceId, groupId, threadIndex, waitTimeout, &event);
if (schedRet == DRV_ERROR_NONE) {
CheckIfRecover(errCount, "enqueue", threadIndex, groupId);
StatisticManager::GetInstance().AwakenAdd();
ProcessEvent(threadIndex, event, index);
} else if (schedRet == DRV_ERROR_SCHED_WAIT_TIMEOUT) {
CheckIfRecover(errCount, "enqueue", threadIndex, groupId);
BQS_LOG_DEBUG("halEschedWaitEvent enqueue event timeout, groupId=%u, thread index:%u",
groupId, threadIndex);
continue;
} else if (schedRet == DRV_ERROR_PARA_ERROR) {
BQS_LOG_ERROR(
"halEschedWaitEvent enqueue event failed, deviceId[%u] threadIndex[%u] groupId[%u] error[%d].",
deviceId, threadIndex, groupId, schedRet);
break;
} else {
if (errCount++ == 0U) {
BQS_LOG_ERROR(
"halEschedWaitEvent enqueue event failed, deviceId[%u] threadIndex[%u] groupId[%u] error[%d].",
deviceId, threadIndex, groupId, schedRet);
}
}
}
BQS_LOG_INFO("QueueSchedule enqueue thread[%u] exit", threadIndex);
}
void QueueSchedule::F2NFThreadTask(const uint32_t threadIndex, const uint32_t bindCpuIndex, const uint32_t groupId)
{
BQS_LOG_INFO("Queue Schedule f2nf thread[%u] start.", threadIndex);
const auto threadName = std::string(F2NF_THREAD_NAME_PREFIX).append(std::to_string(threadIndex));
(void)pthread_setname_np(pthread_self(), threadName.c_str());
BindAicpu(threadIndex, bindCpuIndex);
const uint64_t eventBitmap =
static_cast<uint64_t>(1LU << static_cast<uint64_t>(dgw::EVENT_RECV_REQUEST_MSG)) |
static_cast<uint64_t>(1LU << static_cast<uint64_t>(dgw::EVENT_SEND_COMPLETION_MSG)) |
static_cast<uint64_t>(1LU << static_cast<uint64_t>(dgw::EVENT_RECV_COMPLETION_MSG)) |
static_cast<uint64_t>(1LU << static_cast<uint64_t>(dgw::EVENT_CONGESTION_RELIEF_MSG));
BQS_LOG_INFO("F2NF group[%u] subscribe event, eventBitmap[%lu]", f2nfGroupId_, eventBitmap);
const auto ret = halEschedSubscribeEvent(deviceId_, f2nfGroupId_, threadIndex, eventBitmap);
if (ret != DRV_ERROR_NONE) {
BQS_LOG_ERROR("halEschedSubscribeEvent failed, groupId[%u] eventBitmap[%lu] result[%d].",
f2nfGroupId_, eventBitmap, static_cast<int32_t>(ret));
StopQueueSchedule();
return;
}
event_sched_grp_qos qos = {};
const std::vector<uint32_t> eventList = {dgw::EVENT_RECV_REQUEST_MSG, dgw::EVENT_SEND_COMPLETION_MSG,
dgw::EVENT_RECV_COMPLETION_MSG};
for (const uint32_t eventId : eventList) {
qos.maxNum = MAX_QOS_NUM_FOR_HCCL_EVENT;
const auto drvRet = halEschedSetGrpEventQos(deviceId_, f2nfGroupId_, static_cast<EVENT_ID>(eventId), &qos);
if (drvRet != DRV_ERROR_NONE) {
BQS_LOG_ERROR("Failed to call halEschedSetGrpEventQos, groupId[%u], qos.maxNum[%u], ret[%d].",
f2nfGroupId_, qos.maxNum, static_cast<int32_t>(drvRet));
StopQueueSchedule();
return;
}
}
struct event_info event = {};
constexpr int32_t waitTimeout = 2000;
uint32_t errCount = 0U;
while (running_) {
const int32_t schedRet = halEschedWaitEvent(deviceId_, groupId, threadIndex, waitTimeout, &event);
if (schedRet == DRV_ERROR_NONE) {
CheckIfRecover(errCount, "f2nf", threadIndex, groupId);
(void)ProcessEvent(threadIndex, event, 0U);
} else if (schedRet == DRV_ERROR_SCHED_WAIT_TIMEOUT) {
CheckIfRecover(errCount, "f2nf", threadIndex, groupId);
BQS_LOG_DEBUG("halEschedWaitEvent f2nf event timeout,thread index:%u", threadIndex);
continue;
} else if (schedRet == DRV_ERROR_PARA_ERROR) {
BQS_LOG_ERROR(
"halEschedWaitEvent f2nf event failed, deviceId[%u] threadIndex[%u] groupId[%u] error[%d].",
deviceId_, threadIndex, groupId, schedRet);
break;
} else {
if (errCount++ == 0U) {
BQS_LOG_ERROR(
"halEschedWaitEvent f2nf event failed, deviceId[%u] threadIndex[%u] groupId[%u] error[%d].",
deviceId_, threadIndex, groupId, schedRet);
}
}
}
BQS_LOG_INFO("Queue Schedule f2nf thread[%u] exit", threadIndex);
}
BqsStatus QueueSchedule::ProcessEvent(const uint32_t threadIndex, event_info &event, const uint32_t index)
{
auto ret = dgw::FsmStatus::FSM_SUCCESS;
const uint32_t eventId = event.comm.event_id;
const uint32_t deviceId = (index == 0) ? deviceId_ : initQsParams_.deviceIdExtra;
switch (eventId) {
case static_cast<uint32_t>(EVENT_QUEUE_ENQUEUE): {
BQS_LOG_INFO("the [%u]th thread[%u] recv enqueEvent", index, threadIndex);
ProcessEnqueueEvent(threadIndex, event, index, false);
break;
}
case static_cast<uint32_t>(EVENT_QUEUE_FULL_TO_NOT_FULL): {
BQS_LOG_INFO("the [%u]th thread[%u] recv f2nfEvent", index, threadIndex);
ProcessEnqueueEvent(threadIndex, event, index, true);
break;
}
case dgw::EVENT_RECV_REQUEST_MSG: {
ret = dgw::HcclProcess::GetInstance().ProcessRecvRequestEvent(event, deviceId, index);
break;
}
case dgw::EVENT_SEND_COMPLETION_MSG: {
ret = dgw::HcclProcess::GetInstance().ProcessSendCompletionEvent(event, deviceId, index);
break;
}
case dgw::EVENT_RECV_COMPLETION_MSG: {
ret = dgw::HcclProcess::GetInstance().ProcessRecvCompletionEvent(event, deviceId, index);
break;
}
case dgw::EVENT_CONGESTION_RELIEF_MSG: {
ret = dgw::HcclProcess::GetInstance().ProcessCongestionReliefEvent(event, deviceId, index);
break;
}
default: {
BQS_LOG_WARN("Unsupported event[%u].", eventId);
ret = dgw::FsmStatus::FSM_FAILED;
break;
}
}
if (ret != dgw::FsmStatus::FSM_SUCCESS) {
return BqsStatus::BQS_STATUS_INNER_ERROR;
}
return BqsStatus::BQS_STATUS_OK;
}
void QueueSchedule::DaemonThreadTask(const uint32_t index)
{
BQS_LOG_INFO("Queue Schedule Daemon thread start.");
(void)pthread_setname_np(pthread_self(), DAEMON_THREAD_NAME_PREFIX);
const uint32_t deviceId = (index == 0) ? deviceId_ : initQsParams_.deviceIdExtra;
const uint32_t enqueGroupId = (index == 0) ? enqueGroupId_ : initQsParams_.enqueGroupIdExtra;
BindCpuUtils::SetThreadFIFO(deviceId);
std::unique_lock<std::mutex> daemonWaitLock(daemonWaitMtx_);
uint64_t awakenTimes = 0UL;
QueueSubscriber subscriber;
subscriber.devId = deviceId;
subscriber.spGrpId = 0;
subscriber.pid = static_cast<int32_t>(getpid());
subscriber.groupId = static_cast<int32_t>(enqueGroupId);
while ((daemonWait_.wait_for(daemonWaitLock, std::chrono::milliseconds(reschedInterval_)) ==
std::cv_status::timeout) && (running_)) {
if (StatisticManager::GetInstance().GetEventScheduleStat() == 0U) {
continue;
}
const uint64_t newAwakenTimes = StatisticManager::GetInstance().GetAwakenTimes();
if (awakenTimes == newAwakenTimes) {
int32_t ret = halQueueCtrlEvent(&subscriber, QUE_PAUSE_EVENT);
if (ret == DRV_ERROR_NONE) {
DaemonEnqueueEvent(index);
ret = halQueueCtrlEvent(&subscriber, QUE_RESUME_EVENT);
if (ret != DRV_ERROR_NONE) {
BQS_LOG_ERROR(
"halQueueCtrlEvent QUE_RESUME_EVENT failed, deviceId_[%u] enqueGroupId_[%u] ret[%d].",
deviceId, enqueGroupId, ret);
}
} else {
BQS_LOG_RUN_WARN("halQueueCtrlEvent QUE_PAUSE_EVENT failed, deviceId_[%u] enqueGroupId_[%u] ret[%d].",
deviceId, enqueGroupId, ret);
}
} else {
awakenTimes = newAwakenTimes;
}
}
BQS_LOG_INFO("Queue Schedule Daemon thread exit");
}
void QueueSchedule::ProcessEnqueueEvent(const uint32_t threadIndex, const event_info &event, const uint32_t index,
const bool procF2NF)
{
auto &queueEventAtomicFlag = (index == 0U) ? queueEventAtomicFlag_ : queueEventAtomicFlagExtra_;
if (!queueEventAtomicFlag.test_and_set()) {
ProfileManager &profileManager = ProfileManager::GetInstance(index);
const uint64_t eventBegin = profileManager.GetCpuTick();
const uint64_t schedDelay = static_cast<uint64_t>(event.comm.sched_timestamp - event.comm.submit_timestamp);
const uint64_t schedTimes = StatisticManager::GetInstance().EventScheduleStat();
profileManager.InitMaker(schedTimes, schedDelay);
const bool procRelation = QueueManager::GetInstance().HandleRelationEvent(index);
const uint64_t f2NFBegin = profileManager.GetCpuTick();
profileManager.SetRelationCost(f2NFBegin - eventBegin);
bool hasF2NF = QueueManager::GetInstance().HandleFullToNotFullEvent(index);
profileManager.Setf2NFCost(profileManager.GetCpuTick() - f2NFBegin);
hasF2NF |= procF2NF;
const bool procAsynMemBuff = QueueManager::GetInstance().HandleAsynMemBuffEvent(index);
const uint64_t scheduleBegin = profileManager.GetCpuTick();
ScheduleDataBuffAll(!(procRelation || hasF2NF || procAsynMemBuff), index);
StatisticManager::GetInstance().UpdateScheuleStatistic(profileManager.GetTimeCost(schedDelay),
profileManager.GetTimeCost(profileManager.GetCpuTick() - scheduleBegin));
profileManager.TryMarker(eventBegin);
queueEventAtomicFlag.clear();
return;
}
if (procF2NF) {
ProcessFullToNotFullEvent(index);
return;
}
bqs::StatisticManager::GetInstance().EnqueueEventFalseAwakenStat();
BQS_LOG_DEBUG("Thread[%u] can't work as other thread is working.", threadIndex);
}
void QueueSchedule::ProcessFullToNotFullEvent(const uint32_t index)
{
bqs::StatisticManager::GetInstance().F2nfEventStat();
(void)QueueManager::GetInstance().EnqueueFullToNotFullEvent(index);
}
void QueueSchedule::DaemonEnqueueEvent(const uint32_t index)
{
auto &queueEventAtomicFlag = (index == 0U) ? queueEventAtomicFlag_ : queueEventAtomicFlagExtra_;
if (!queueEventAtomicFlag.test_and_set()) {
StatisticManager::GetInstance().DaemonEventScheduleStat();
const bool procRelation = QueueManager::GetInstance().HandleRelationEvent(index);
ScheduleDataBuffAll(!procRelation, index);
queueEventAtomicFlag.clear();
} else {
BQS_LOG_WARN("Daemon thread can't work as other thread is working, may event error.");
}
}
void QueueSchedule::DynamicSchedule(const uint32_t index) const
{
const auto &dynamicCfgKeys = dgw::ScheduleConfig::GetInstance().GetSchedKeys();
BQS_LOG_DEBUG("dynamicCfgKeys size is %zu", dynamicCfgKeys.size());
if (dynamicCfgKeys.empty()) {
return;
}
uint32_t dynamicScheduleCount = 0U;
for (const auto key: dynamicCfgKeys) {
if (dgw::ScheduleConfig::GetInstance().IsStopped(key)) {
BQS_LOG_INFO("key[%u] is stopped, then skip", key);
continue;
}
while (dynamicScheduleCount++ < 100U) {
std::vector<dgw::DynamicSchedMgr::ResponseInfo> responses;
const auto getResponseRet = dgw::DynamicSchedMgr::GetInstance(index).GetResponse(key, responses);
if ((getResponseRet != dgw::FsmStatus::FSM_SUCCESS) || (responses.size() == 0U)) {
BQS_LOG_DEBUG("Can't get response, ret is %d", static_cast<int32_t>(getResponseRet));
break;
}
DynamicScheduleByResponse(key, index, responses);
}
}
BQS_LOG_DEBUG("finish DynamicSchedule, dynamicScheduleCount is %u", dynamicScheduleCount);
}
void QueueSchedule::ScheduleDataBuffAll(const bool dataEnqueue, const uint32_t index) const
{
BQS_LOG_INFO("the [%u]th thread ScheduleDataBuffAll.", index);
bool hasDequeueFlag = false;
const auto &orderedSubscribeQueues = (index == 0U) ?
BindRelation::GetInstance().GetOrderedSubscribeQueueId() :
BindRelation::GetInstance().GetOrderedSubscribeQueueIdExtra();
const auto &srcToDstRelation = (index == 0U) ?
BindRelation::GetInstance().GetSrcToDstRelation() :
BindRelation::GetInstance().GetSrcToDstExtraRelation();
ProfileManager::GetInstance(index).SetSrcQueueNum(static_cast<uint32_t>(orderedSubscribeQueues.size()));
dgw::InnerMessage msg;
msg.msgType = dgw::InnerMsgType::INNER_MSG_PUSH;
BindRelation::GetInstance().ClearAbnormalEntityInfo(index);
if (dgw::EntityManager::Instance(index).IsExistFullEntity() ||
dgw::EntityManager::Instance(index).IsExistAsyncMemEntity()) {
for (const auto &src : orderedSubscribeQueues) {
if (dgw::ScheduleConfig::GetInstance().IsStopped(src.GetSchedCfgKey())) {
BQS_LOG_INFO("Skip schedule src[%s] for it has been stopped.", src.ToString().c_str());
continue;
}
const auto iter = srcToDstRelation.find(src);
if (iter == srcToDstRelation.end()) {
BQS_LOG_WARN("Can't find dst queues for queue[%u].", src.GetId());
continue;
}
for (auto &dst : iter->second) {
if (ProcessDstEntity(dst, index) == dgw::FsmStatus::FSM_ERROR) {
BQS_LOG_ERROR("Skip scheduler for routes maybe has been modified");
break;
};
}
const auto &srcEntity = src.GetEntity();
if (srcEntity->ProcessMessage(msg) == dgw::FsmStatus::FSM_ERROR) {
BQS_LOG_ERROR("skip scheduler for routes maybe have been modified");
break;
}
hasDequeueFlag |= (srcEntity->GetScheduleCount() > 0UL);
}
} else {
for (const auto &src : orderedSubscribeQueues) {
if (dgw::ScheduleConfig::GetInstance().IsStopped(src.GetSchedCfgKey())) {
BQS_LOG_INFO("Skip schedule src[%s] for it has been stopped.", src.ToString().c_str());
continue;
}
const auto &srcEntity = src.GetEntity();
if (srcEntity->ProcessMessage(msg) == dgw::FsmStatus::FSM_ERROR) {
BQS_LOG_ERROR("skip scheduler for routes maybe have been modified");
break;
}
hasDequeueFlag |= (srcEntity->GetScheduleCount() > 0UL);
}
}
DynamicSchedule(index);
if (hasDequeueFlag && (!aicpuFeatureDisableRecvRequestEvent_)) {
(void)dgw::EntityManager::Instance(index).SupplyRecvRequestEvent();
}
if ((!hasDequeueFlag) && dataEnqueue) {
StatisticManager::GetInstance().AddScheduleEmpty();
}
BindRelation::GetInstance().UpdateRelation(index);
}
dgw::FsmStatus QueueSchedule::ProcessDstEntity(const EntityInfo &entity, const uint32_t index) const
{
const auto dstEntity = entity.GetEntity();
if (dstEntity == nullptr) {
BQS_LOG_ERROR("Get entity ptr for entity[%s] failed.", entity.ToString().c_str());
return dgw::FsmStatus::FSM_FAILED;
}
if (entity.GetType() != dgw::EntityType::ENTITY_GROUP) {
BQS_LOG_DEBUG("Process dst entity, id[%u], type[%s].",
dstEntity->GetId(), dstEntity->GetTypeDesc().c_str());
dgw::InnerMessage msg;
msg.msgType = dstEntity->GetCurState() == dgw::FsmState::FSM_FULL_STATE ? dgw::InnerMsgType::INNER_MSG_F2NF :
dgw::InnerMsgType::INNER_MSG_PUSH;
return dstEntity->ProcessMessage(msg);
}
auto &entitiesInGroup = dgw::EntityManager::Instance(index).GetEntitiesInGroup(entity.GetId());
for (auto &entityInGroup : entitiesInGroup) {
BQS_LOG_DEBUG("Process dst entity in group[%u], id[%u], type[%s].",
dstEntity->GetId(), entityInGroup->GetId(), entityInGroup->GetTypeDesc().c_str());
dgw::InnerMessage msg;
msg.msgType = entityInGroup->GetCurState() == dgw::FsmState::FSM_FULL_STATE ?
dgw::InnerMsgType::INNER_MSG_F2NF : dgw::InnerMsgType::INNER_MSG_PUSH;
(void) entityInGroup->ProcessMessage(msg);
}
return dgw::FsmStatus::FSM_SUCCESS;
}
QueueSchedule::~QueueSchedule()
{
running_ = false;
daemonWait_.notify_all();
for (auto &worker : workThreads_) {
if (worker.joinable()) {
worker.join();
}
}
}
void QueueSchedule::WaitForStop()
{
BQS_LOG_RUN_INFO("WaitForStop begin");
RouterServer::GetInstance().Destroy();
StopQueueSchedule();
for (auto &worker : workThreads_) {
if (worker.joinable()) {
worker.join();
}
}
BQS_LOG_INFO("WaitForStop end");
}
* init drv event scheduler.
* @return BQS_STATUS_OK: success, other: error
*/
BqsStatus QueueSchedule::InitDrvSchedModule(const uint32_t deviceId, const uint32_t enqueGroupId,
const uint32_t f2nfGroupId) const
{
BQS_LOG_INFO("Attach device[%u] to drv scheduler", deviceId);
(void)f2nfGroupId;
int32_t ret = halEschedAttachDevice(deviceId);
if ((ret != DRV_ERROR_NONE) && (ret != DRV_ERROR_PROCESS_REPEAT_ADD)) {
BQS_LOG_ERROR("Failed to attach device[%u] for eSched, result[%d].", deviceId, ret);
return BQS_STATUS_DRIVER_ERROR;
}
for (uint32_t idx = 0; idx < initQsParams_.devIdVec.size(); idx++) {
uint32_t currDeviceId = initQsParams_.devIdVec[idx];
ret = halEschedAttachDevice(currDeviceId);
if ((ret != DRV_ERROR_NONE) && (ret != DRV_ERROR_PROCESS_REPEAT_ADD)) {
BQS_LOG_ERROR("Failed to attach device[%u] for eSched, result[%d].", currDeviceId, ret);
return BQS_STATUS_DRIVER_ERROR;
}
if (deviceId_ == currDeviceId) {
continue;
}
QueueSetInputPara inPutParam;
(void)halQueueSet(currDeviceId, QUEUE_ENABLE_LOCAL_QUEUE, &inPutParam);
ret = halQueueInit(currDeviceId);
if ((ret != DRV_ERROR_NONE) && (ret != DRV_ERROR_REPEATED_INIT)) {
BQS_LOG_ERROR("host flow halQueueInit error, ret=[%d]", static_cast<int32_t>(ret));
return BQS_STATUS_DRIVER_ERROR;
}
}
const bool setPidPriorityFlag = (bqs::GetRunContext() == bqs::RunContext::HOST) ?
true : aicpuFeatureSetPidPriority_;
if (setPidPriorityFlag) {
(void)halEschedSetPidPriority(deviceId, PRIORITY_LEVEL0);
}
GROUP_TYPE enqueGrpType = GRP_TYPE_BIND_CP_CPU;
GROUP_TYPE f2nfGrpType = GRP_TYPE_BIND_CP_CPU;
if (hasAICPU_ && !isZeroSizeAicpuNum_ && (bqs::RunContext::HOST != bqs::GetRunContext())) {
enqueGrpType = GRP_TYPE_BIND_DP_CPU;
f2nfGrpType = GRP_TYPE_BIND_DP_CPU;
}
BQS_LOG_INFO("Create enqueGroup[%u] type[%d] on device[%u].", enqueGroupId, enqueGrpType, deviceId);
ret = halEschedCreateGrp(deviceId, enqueGroupId, enqueGrpType);
if (ret != DRV_ERROR_NONE) {
(void)halEschedDettachDevice(deviceId);
BQS_LOG_ERROR("Failed to create enqueGroup, groupId[%u] result[%d].", enqueGroupId, ret);
return BQS_STATUS_DRIVER_ERROR;
}
if (!initQsParams_.numaFlag) {
BQS_LOG_INFO("Create f2nfGroup[%u], type[%d]", f2nfGroupId_, f2nfGrpType);
ret = halEschedCreateGrp(deviceId_, f2nfGroupId_, f2nfGrpType);
if (ret != DRV_ERROR_NONE) {
(void)halEschedDettachDevice(deviceId_);
BQS_LOG_ERROR("Failed to create f2nfGroup, groupId[%u] result[%d].", f2nfGroupId_, ret);
return BQS_STATUS_DRIVER_ERROR;
}
}
return BQS_STATUS_OK;
}
void QueueSchedule::ReportAbnormal() const
{
BQS_LOG_ERROR("Enqueue thread has missed heartbeat for %u seconds", abnormalInterval_);
if ((bqs::GetRunContext() != bqs::RunContext::HOST) &&
(initQsParams_.starter != bqs::QsStartType::START_BY_DEPLOYER) &&
(initQsParams_.runMode != QueueSchedulerRunMode::MULTI_THREAD)) {
const int32_t ret = TsdDestroy(deviceId_, TSD_QS, initQsParams_.pid, initQsParams_.vfId);
if (ret != 0) {
BQS_LOG_ERROR("dev[%u] send abnormal msg to tsdaemon failed, ret[%d]", deviceId_, ret);
}
}
}
}