* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "local_sched_driver.h"
#include "common/constants/actor_name.h"
#include "local_scheduler/traefik_registry/traefik_registry.h"
#include "meta_store_monitor/meta_store_monitor_factory.h"
#include "common/utils/param_check.h"
#include "local_scheduler/bundle_manager/bundle_mgr_actor.h"
#include "local_scheduler/debug_instance_info_monitor/debug_instance_info_monitor.h"
#include "local_scheduler/instance_control/posix_api_handler/posix_api_handler.h"
#include "local_scheduler/gc_actor/local_gc_actor.h"
#include "local_scheduler/local_group_ctrl/local_group_ctrl_actor.h"
#include "local_scheduler/grpc_server/bus_service/bus_service.h"
namespace functionsystem::local_scheduler {
const std::string LOCAL_SCHEDULER = "local-scheduler";
namespace {
* Extract IP address from a full address string (ip:port format).
* This is used to get the IP from LiteBus address for gRPC servers.
*
* @param address Full address in "ip:port" format
* @return Extracted IP address, or original address if no colon found
*/
std::string ExtractIPFromAddress(const std::string& address)
{
size_t colonPos = address.find_last_of(':');
if (colonPos != std::string::npos && colonPos > 0) {
if (address[0] == '[') {
size_t closeBracketPos = address.find(']', 1);
if (closeBracketPos != std::string::npos && closeBracketPos < colonPos) {
return address.substr(1, closeBracketPos - 1);
}
}
return address.substr(0, colonPos);
}
return address;
}
}
void LocalSchedDriver::SetRuntimeConfig(InstanceCtrlConfig &config)
{
ASSERT_IF_NULL(param_.dsAuthConfig);
RuntimeConfig runtimeConfig{
.runtimeHeartbeatEnable = param_.runtimeHeartbeatEnable,
.runtimeMaxHeartbeatTimeoutTimes = param_.runtimeMaxHeartbeatTimeoutTimes,
.runtimeHeartbeatTimeoutMS = param_.runtimeHeartbeatTimeoutMS,
.runtimeInitCallTimeoutMS = param_.runtimeInitCallTimeoutMS,
.runtimeShutdownTimeoutSeconds = param_.runtimeShutdownTimeoutSeconds,
.runtimeDsAuthEnable = param_.dsAuthConfig->isRuntimeEnable,
.runtimeDsEncryptEnable = param_.dsAuthConfig->isRuntimeEncryptEnable,
.dataSystemAccessKey = param_.dsAuthConfig->ak,
.dataSystemSecurityKey = param_.dsAuthConfig->sk,
.runtimeDsClientPublicKey = param_.dsAuthConfig->clientPublicKey,
.runtimeDsClientPrivateKey = param_.dsAuthConfig->clientPrivateKey,
.runtimeDsServerPublicKey = param_.dsAuthConfig->serverPublicKey,
};
YRLOG_INFO(
"runtime heartbeat config: runtimeHeartbeatEnable: {}, runtimeMaxHeartbeatTimeoutTimes: {}, "
"runtimeHeartbeatTimeoutMS: {}, runtimeRecoverEnable: {}, runtimeInitCallTimeoutMS:{}, "
"runtimeShutdownTimeoutSeconds:{} ",
param_.runtimeHeartbeatEnable, param_.runtimeMaxHeartbeatTimeoutTimes, param_.runtimeHeartbeatTimeoutMS,
param_.runtimeRecoverEnable, param_.runtimeInitCallTimeoutMS, param_.runtimeShutdownTimeoutSeconds);
config.runtimeConfig = runtimeConfig;
}
Status LocalSchedDriver::Create()
{
metaStorageAccessor_ = std::make_shared<MetaStorageAccessor>(metaStoreClient_);
resourceViewMgr_ = std::make_shared<resource_view::ResourceViewMgr>();
resourceViewMgr_->Init(param_.nodeID, param_.resourceViewActorParam);
localSchedSrv_ = LocalSchedSrv::Create(param_.localSchedSrvParam);
funcAgentMgr_ = FunctionAgentMgr::Create(param_.nodeID, param_.funcAgentMgrParam, metaStoreClient_);
abnormalProcessor_ = AbnormalProcessor::Create(param_.nodeID);
rGroupCtrl_ = ResourceGroupCtrl::Init();
InstanceCtrlConfig config{};
SetRuntimeConfig(config);
config.maxGrpcSize = param_.maxGrepSize;
config.connectTimeout = param_.runtimeConnTimeoutSeconds;
config.isPseudoDataPlane = param_.isPseudoDataPlane;
config.cacheStorageHost = param_.cacheStorageHost;
config.limitResource = {
.minCpu = param_.limitResource.minCpu,
.minMemory = param_.limitResource.minMemory,
.maxCpu = param_.limitResource.maxCpu,
.maxMemory = param_.limitResource.maxMemory,
};
config.enableServerMode = param_.enableServerMode;
config.enableSSL = param_.enableSSL;
config.serverRootCert = param_.serverRootCert;
config.serverNameOverride = param_.serverNameOverride;
config.posixPort = param_.posixPort;
config.schedulePlugins = param_.schedulePlugins;
config.enableTenantAffinity = param_.enableTenantAffinity;
config.createLimitationEnable = param_.createLimitationEnable;
config.tokenBucketCapacity = param_.tokenBucketCapacity;
config.isMetaStoreEnabled = param_.isMetaStoreEnabled;
config.isPartialWatchInstances = param_.isPartialWatchInstances;
config.maxPriority = param_.maxPriority;
config.enablePreemption = param_.enablePreemption;
config.enableFakeSuspendResume = param_.enableFakeSuspendResume;
config.udsPath = param_.udsPath;
std::string externalIP = ExtractIPFromAddress(param_.address);
if (param_.sessionGrpcPort != "0") {
config.proxyGrpcAddress = externalIP + ":" + param_.sessionGrpcPort;
} else {
config.proxyGrpcAddress = externalIP + ":" + param_.grpcListenPort;
}
instanceCtrl_ = InstanceCtrl::Create(param_.nodeID, config);
PosixAPIHandler::BindInstanceCtrl(instanceCtrl_);
PosixAPIHandler::BindControlClientManager(param_.controlInterfacePosixMgr);
PosixAPIHandler::BindLocalSchedSrv(localSchedSrv_);
PosixAPIHandler::BindResourceGroupCtrl(rGroupCtrl_);
PosixAPIHandler::SetMaxPriority(param_.maxPriority);
if (param_.enableTraefikRegistry) {
traefikRegistry_ = std::make_shared<TraefikRegistry>(
metaStorageAccessor_,
param_.traefikEtcdPrefix,
param_.traefikHttpEntryPoint,
param_.traefikEnableTLS,
param_.traefikServersTransport);
instanceCtrl_->SetTraefikRegistry(traefikRegistry_);
YRLOG_INFO("TraefikRegistry initialized and injected: prefix={}, entryPoint={},"
" enableTLS={}, serversTransport={}",
param_.traefikEtcdPrefix, param_.traefikHttpEntryPoint,
param_.traefikEnableTLS, param_.traefikServersTransport);
} else {
YRLOG_INFO("Traefik registry disabled");
}
subscriptionMgr_ = SubscriptionMgr::Init(param_.nodeID,
SubscriptionMgrConfig{ .isPartialWatchInstances = param_.isPartialWatchInstances });
subscriptionMgr_->BindInstanceCtrl(instanceCtrl_);
subscriptionMgr_->BindLocalSchedSrv(localSchedSrv_);
httpServer_ = std::make_shared<HttpServer>(LOCAL_SCHEDULER);
apiRouteRegister_ = std::make_shared<DefaultHealthyRouter>(param_.nodeID);
metaStoreHealthyObserver_ = std::make_shared<InstanceCtrlMetaStoreHealthyObserver>(instanceCtrl_);
if (auto registerStatus(httpServer_->RegisterRoute(apiRouteRegister_)); registerStatus != StatusCode::SUCCESS) {
YRLOG_ERROR("failed to register health check api router.");
}
return Status::OK();
}
std::string GetMonitorAddress(const LocalSchedStartParam ¶m)
{
return param.metaStoreAddress;
}
Status LocalSchedDriver::Start()
{
YRLOG_INFO(
"start local scheduler driver, nodeID: {}, global scheduler address: {}, scheduler policy: {}, "
"meta store address: {}, driver gateway service enable: {}, enablePrintResourceView: {}",
param_.nodeID, param_.globalSchedulerAddress, param_.schedulePolicy, param_.metaStoreAddress,
param_.enableDriver, param_.enablePrintResourceView);
if (auto status(Create()); status != StatusCode::SUCCESS) {
return status;
}
if (!CreatePosixAndDriverServer()) {
YRLOG_ERROR("failed to start posix and driver server");
return Status(StatusCode::FAILED);
}
BindInstanceCtrl();
snapCtrl_ = SnapCtrl::Create(param_.nodeID);
snapCtrl_->BindFunctionAgentMgr(funcAgentMgr_);
snapCtrl_->BindLocalSchedSrv(localSchedSrv_);
snapCtrl_->BindInstanceCtrl(instanceCtrl_);
snapCtrl_->BindClientManager(param_.controlInterfacePosixMgr);
instanceCtrl_->BindSnapCtrl(snapCtrl_);
gcActor_ = std::make_shared<LocalGcActor>(LOCAL_GC_ACTOR_NAME, param_.nodeID);
gcActor_->BindInstanceControlView(instanceCtrl_->GetInstanceControlView());
gcActor_->BindInstanceCtrl(instanceCtrl_);
litebus::Spawn(gcActor_);
abnormalProcessor_->BindMetaStoreClient(metaStoreClient_);
abnormalProcessor_->BindObserver(param_.controlPlaneObserver);
abnormalProcessor_->BindInstanceCtrl(instanceCtrl_);
abnormalProcessor_->BindRaiseWrapper(std::make_shared<RaiseWrapper>());
abnormalProcessor_->BindFunctionAgentMgr(funcAgentMgr_);
localSchedSrv_->Start(instanceCtrl_, resourceViewMgr_);
funcAgentMgr_->Start(instanceCtrl_, resourceViewMgr_->GetInf(resource_view::ResourceType::PRIMARY));
abnormalProcessor_->Start();
localSchedSrv_->BindFunctionAgentMgr(funcAgentMgr_);
localSchedSrv_->BindSubscriptionMgr(subscriptionMgr_);
funcAgentMgr_->BindLocalSchedSrv(localSchedSrv_);
BundleManagerActorParam bundleManagerActorParam = {
.actorName = BUNDLE_MGR_ACTOR_NAME,
.nodeId = param_.nodeID,
.metaStoreClient = metaStoreClient_
};
auto bundleMgrActor = std::make_shared<BundleMgrActor>(bundleManagerActorParam);
bundleMgr_ = std::make_shared<BundleMgr>(bundleMgrActor);
bundleMgrActor->BindInstanceCtrl(instanceCtrl_);
bundleMgrActor->BindLocalSchedSrv(localSchedSrv_);
bundleMgrActor->BindResourceViewMgr(resourceViewMgr_);
bundleMgrActor->BindScheduler(instanceCtrl_->GetScheduler());
litebus::Spawn(bundleMgrActor);
funcAgentMgr_->BindBundleMgr(bundleMgr_);
param_.controlPlaneObserver->AttachTenantListener(funcAgentMgr_);
auto localGroupCtrlActor =
std::make_shared<LocalGroupCtrlActor>(LOCAL_GROUP_CTRL_ACTOR_NAME, param_.nodeID, metaStoreClient_);
localGroupCtrl_ = std::make_shared<LocalGroupCtrl>(localGroupCtrlActor);
localGroupCtrlActor->BindScheduler(instanceCtrl_->GetScheduler());
localGroupCtrlActor->BindLocalSchedSrv(localSchedSrv_);
localGroupCtrlActor->BindControlInterfaceClientManager(param_.controlInterfacePosixMgr);
localGroupCtrlActor->BindInstanceCtrl(instanceCtrl_);
localGroupCtrlActor->BindResourceView(resourceViewMgr_);
PosixAPIHandler::BindLocalGroupCtrl(localGroupCtrl_);
litebus::Spawn(localGroupCtrlActor);
(void)litebus::Spawn(httpServer_);
if (param_.dataObjClient != nullptr) {
param_.dataObjClient->InitDistributedCacheClient();
}
auto monitor = MetaStoreMonitorFactory::GetInstance().GetMonitor(GetMonitorAddress(param_));
if (monitor != nullptr) {
monitor->RegisterHealthyObserver(funcAgentMgr_);
monitor->RegisterHealthyObserver(metaStoreHealthyObserver_);
monitor->RegisterHealthyObserver(localGroupCtrl_);
} else {
YRLOG_WARN("failed to get monitor of address {}.", GetMonitorAddress(param_));
}
resourceViewMgr_->GetInf(resource_view::ResourceType::PRIMARY)
->RegisterUnitDisableFunc([localSchedSrv(localSchedSrv_)](const std::string &agentName) {
localSchedSrv->DeletePod(agentName,
"disable-agent-" + litebus::uuid_generator::UUID::GetRandomUUID().ToString(),
"agent disabled");
});
if (param_.distributedCacheClient != nullptr && param_.distributedCacheClient->IsDsClientEnable()) {
StartDsHealthyCheck();
}
if (param_.runtimeInstanceDebugEnable) {
StartDebugInstanceInfoMonitor();
}
isStarted_ = true;
return Status::OK();
}
Status LocalSchedDriver::Sync()
{
auto status = ActorSync(
{ abnormalProcessor_, funcAgentMgr_, instanceCtrl_, localGroupCtrl_, localSchedSrv_, bundleMgr_, snapCtrl_ });
if (status.IsError()) {
return status;
}
YRLOG_INFO("successful to sync state of local scheduler");
return Status::OK();
}
Status LocalSchedDriver::Recover()
{
auto status = ActorRecover(
{ abnormalProcessor_, funcAgentMgr_, instanceCtrl_, localGroupCtrl_, localSchedSrv_, bundleMgr_, snapCtrl_ });
if (status.IsError()) {
return status;
}
YRLOG_INFO("successful to recover local scheduler");
return Status::OK();
}
void LocalSchedDriver::ToReady()
{
ActorReady({ abnormalProcessor_, funcAgentMgr_, instanceCtrl_, localGroupCtrl_, localSchedSrv_, bundleMgr_,
resourceViewMgr_->GetInf(resource_view::ResourceType::PRIMARY),
resourceViewMgr_->GetInf(resource_view::ResourceType::VIRTUAL), snapCtrl_ });
}
Status LocalSchedDriver::Stop()
{
if (param_.unRegisterWhileStop && localSchedSrv_ != nullptr && isStarted_) {
(void)localSchedSrv_->GracefulShutdown().Get();
}
if (execStreamService_) {
YRLOG_INFO("Closing ExecStreamService sessions");
execStreamService_->CloseAllSessions();
}
if (sessionGrpcServer_) {
sessionGrpcServer_.reset();
YRLOG_INFO("session grpc server stopped");
}
execStreamService_.reset();
if (dsHealthyChecker_) {
litebus::Terminate(dsHealthyChecker_->GetAID());
}
if (httpServer_) {
litebus::Terminate(httpServer_->GetAID());
}
if (gcActor_) {
litebus::Terminate(gcActor_->GetAID());
}
StopActor(
{ abnormalProcessor_, funcAgentMgr_, instanceCtrl_, localGroupCtrl_, localSchedSrv_, bundleMgr_, snapCtrl_ });
return Status::OK();
}
void LocalSchedDriver::Await()
{
if (dsHealthyChecker_) {
litebus::Await(dsHealthyChecker_->GetAID());
}
if (httpServer_) {
litebus::Await(httpServer_->GetAID());
}
if (gcActor_) {
litebus::Await(gcActor_->GetAID());
}
AwaitActor(
{ abnormalProcessor_, funcAgentMgr_, instanceCtrl_, localGroupCtrl_, localSchedSrv_, bundleMgr_, snapCtrl_ });
}
void LocalSchedDriver::BindInstanceCtrl()
{
instanceCtrl_->SetEnablePrintResourceView(param_.enablePrintResourceView);
instanceCtrl_->Start(funcAgentMgr_, resourceViewMgr_, param_.controlPlaneObserver,
param_.aggregatedStrategy_, param_.maxPriority);
instanceCtrl_->BindControlInterfaceClientManager(param_.controlInterfacePosixMgr);
instanceCtrl_->BindMetaStoreClient(metaStoreClient_);
instanceCtrl_->BindLocalSchedSrv(localSchedSrv_);
instanceCtrl_->BindInternalIAM(param_.internalIAM);
instanceCtrl_->BindDataObjClient(param_.dataObjClient);
instanceCtrl_->BindResourceGroupCtrl(rGroupCtrl_);
instanceCtrl_->BindSubscriptionMgr(subscriptionMgr_);
}
void LocalSchedDriver::StartDsHealthyCheck()
{
YRLOG_INFO("enable ds healthy checker, check ds api with interval {} by max {} times", param_.dsHealthCheckInterval,
param_.maxDsHealthCheckTimes);
dsHealthyChecker_ = std::make_shared<DsHealthyChecker>(param_.dsHealthCheckInterval, param_.maxDsHealthCheckTimes,
param_.distributedCacheClient);
dsHealthyChecker_->SubscribeDsHealthy(
[localSchedSrv(localSchedSrv_), instanceCtrl(instanceCtrl_)](const bool isHealthy) {
instanceCtrl->NotifyDsHealthy(isHealthy);
(void)localSchedSrv->NotifyDsHealthy(isHealthy);
});
(void) litebus::Spawn(dsHealthyChecker_);
}
void LocalSchedDriver::StartDebugInstanceInfoMonitor()
{
YRLOG_INFO("enable debug_instance_info_monitor, check debug_instance_info with interval 3000ms");
auto infoMonitor = std::make_shared<DebugInstanceInfoMonitor>(funcAgentMgr_, QUERY_DEBUG_INSTANCE_INFO_INTERVAL_MS);
litebus::Spawn(infoMonitor);
litebus::Async(infoMonitor->GetAID(), &DebugInstanceInfoMonitor::Start);
}
bool LocalSchedDriver::CreatePosixAndDriverServer()
{
if (param_.sessionGrpcPort != "0" && param_.sessionGrpcPort == param_.posixPort) {
YRLOG_ERROR("Session gRPC port ({}) conflicts with POSIX port ({}), cannot start",
param_.sessionGrpcPort, param_.posixPort);
return false;
}
functionsystem::grpc::CommonGrpcServerConfig serverConfig;
serverConfig.ip = param_.ip;
serverConfig.listenPort = param_.posixPort;
serverConfig.udsPath = param_.udsPath;
serverConfig.creds = ::grpc::InsecureServerCredentials();
if (param_.enableSSL) {
if (param_.creds == nullptr) {
return false;
}
serverConfig.creds = param_.creds;
}
posixGrpcServer_ = std::make_shared<functionsystem::grpc::CommonGrpcServer>(serverConfig);
if (param_.enableServerMode) {
param_.posixService->BindInternalIAM(param_.internalIAM);
posixGrpcServer_->RegisterService(param_.posixService);
}
BusServiceParam serviceParam{ .nodeID = param_.nodeID,
.controlPlaneObserver = param_.controlPlaneObserver,
.controlInterfaceClientMgr = param_.controlInterfacePosixMgr,
.instanceCtrl = instanceCtrl_,
.localSchedSrv = localSchedSrv_,
.isEnableServerMode = param_.enableServerMode,
.hostIP = param_.ip };
std::shared_ptr<BusService> busService = std::make_shared<BusService>(std::move(serviceParam));
posixGrpcServer_->RegisterService(busService);
execStreamService_ = std::make_shared<ExecStreamService>(instanceCtrl_->GetActorAID());
if (param_.sessionGrpcPort != "0") {
functionsystem::grpc::CommonGrpcServerConfig sessionConfig;
sessionConfig.ip =
ExtractIPFromAddress(param_.address);
sessionConfig.listenPort = param_.sessionGrpcPort;
sessionConfig.creds = ::grpc::InsecureServerCredentials();
if (param_.enableSSL) {
sessionConfig.creds = param_.creds;
}
sessionGrpcServer_ = std::make_shared<functionsystem::grpc::CommonGrpcServer>(sessionConfig);
sessionGrpcServer_->RegisterService(execStreamService_);
sessionGrpcServer_->Start();
if (!sessionGrpcServer_->WaitServerReady()) {
YRLOG_ERROR("failed to start session grpc server on port {}", param_.sessionGrpcPort);
return false;
}
YRLOG_INFO("Session gRPC server started on port {}, ExecStreamService listening for connections",
param_.sessionGrpcPort);
} else {
posixGrpcServer_->RegisterService(execStreamService_);
YRLOG_INFO("ExecStreamService registered on posix port {} (session server disabled)",
param_.posixPort);
}
posixGrpcServer_->Start();
if (!posixGrpcServer_->WaitServerReady()) {
YRLOG_ERROR("failed to start posix grpc server.");
return false;
}
YRLOG_INFO("POSIX gRPC server started on port {}", param_.posixPort);
return true;
}
}