* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FUNCTION_MASTER_INSTANCE_MANAGER_INSTANCE_MGR_ACTOR_H
#define FUNCTION_MASTER_INSTANCE_MANAGER_INSTANCE_MGR_ACTOR_H
#include <string>
#include <unordered_map>
#include "actor/actor.hpp"
#include "common/constants/actor_name.h"
#include "common/leader/business_policy.h"
#include "common/meta_store_adapter/instance_operator.h"
#include "meta_store_client/meta_store_client.h"
#include "common/meta_store_adapter/meta_store_operate_cacher.h"
#include "meta_store_client/meta_store_struct.h"
#include "common/resource_view/resource_type.h"
#include "function_master/global_scheduler/global_sched.h"
#include "function_master/resource_group_manager/resource_group_manager.h"
#include "group_manager.h"
#include "instance_family_caches.h"
namespace functionsystem::instance_manager {
using InstanceManagerMap = std::unordered_map<std::string, std::shared_ptr<resource_view::InstanceInfo>>;
using InstanceKeyInfoPair = std::pair<std::string, std::shared_ptr<resource_view::InstanceInfo>>;
using GlobalScheduler = functionsystem::global_scheduler::GlobalSched;
using ResourceGroupManager = functionsystem::resource_group_manager::ResourceGroupManager;
struct InstanceManagerStartParam {
bool runtimeRecoverEnable{ false };
bool isMetaStoreEnable{ false };
std::string servicesPath;
std::string libPath;
std::string functionMetaPath;
bool enableAbnormalDoubleCheck;
std::string systemTenantID{ "0" };
};
class InstanceManagerActor : public litebus::ActorBase, public std::enable_shared_from_this<InstanceManagerActor> {
public:
InstanceManagerActor() = delete;
InstanceManagerActor(const std::shared_ptr<MetaStoreClient> &metaClient,
const std::shared_ptr<GlobalScheduler> &scheduler,
const std::shared_ptr<GroupManager> &groupManager,
const std::shared_ptr<ResourceGroupManager> &resourceGroupManager,
const InstanceManagerStartParam ¶m);
~InstanceManagerActor() override = default;
* just for unit test, Get instance by node id.
*
* @param nodeName node id
* @param map instance map ptr
* @return instance map
*/
InstanceManagerMap *Get(const std::string &nodeName, InstanceManagerMap *map);
Status GetAbnormalScheduler(const std::shared_ptr<std::unordered_set<std::string>> &map);
std::unordered_map<std::string, std::unordered_set<std::string>> GetInstanceJobMap();
std::unordered_map<std::string, std::unordered_set<std::string>> GetInstanceFuncMetaMap();
* call by global_scheduler when a local_scheduler fault
*
* @param nodeName local_schedule's id
* @return OK success to migrate instances
*/
litebus::Future<Status> OnLocalSchedFault(const std::string &nodeName);
bool IsLocalAbnormal(const std::string &nodeName);
void ForwardKill(const litebus::AID &from, std::string &&name, std::string &&msg);
void ForwardCustomSignalResponse(const litebus::AID &from, std::string &&name, std::string &&msg);
void TryCancelResponse(const litebus::AID &from, std::string &&name, std::string &&msg);
litebus::Future<Status> TryCancelSchedule(const std::string &id, const messages::CancelType &type,
const std::string &reason);
bool UpdateLeaderInfo(const explorer::LeaderInfo &leaderInfo);
void BindQuotaMgrAID(litebus::AID aid)
{
quotaMgrAID_ = std::move(aid);
}
void HandleSystemUpgrade(bool isUpgrading);
void AddNode(const std::string &nodeName);
void DelNode(const std::string &nodeName);
void SetInstancesReady();
litebus::Future<std::pair<std::string, std::shared_ptr<InstanceInfo>>> GetInstanceInfoByID(
const std::string &instanceID);
std::pair<std::string, std::shared_ptr<InstanceInfo>> GetInstanceInfoByInstanceID(const std::string &instanceID)
{
if (auto it = member_->instID2Instance.find(instanceID); it != member_->instID2Instance.end()) {
return it->second;
}
return {};
}
void OnHealthyStatus(const Status &status);
void ForwardQueryInstancesInfoHandler(const litebus::AID &from, std::string &&name, std::string &&msg);
void OnQueryInstancesInfoFinished(const litebus::AID &from,
const litebus::Future<messages::QueryInstancesInfoResponse> &rsp);
void ForwardQueryInstancesInfoResponseHandler(const litebus::AID &from, std::string &&name, std::string &&msg);
litebus::Future<messages::QueryInstancesInfoResponse> QueryInstancesInfo(
std::shared_ptr<messages::QueryInstancesInfoRequest> req);
litebus::Future<messages::QueryNamedInsResponse> QueryNamedIns(std::shared_ptr<messages::QueryNamedInsRequest> req);
litebus::Future<messages::QueryDebugInstanceInfosResponse> QueryDebugInstancesInfo(
std::shared_ptr<messages::QueryDebugInstanceInfosRequest> req);
void ForwardQueryDebugInstancesInfoHandler(const litebus::AID &from, std::string &&name, std::string &&msg);
void ForwardQueryDebugInstancesInfoResponseHandler(const litebus::AID &from, std::string &&name, std::string &&msg);
void OnQueryDebugInstancesInfoFinished(const litebus::AID &from,
const litebus::Future<messages::QueryDebugInstanceInfosResponse> &rsp);
const std::string &GetSystemTenantID() const
{
return member_->systemTenantID;
}
protected:
void Init() override;
void Finalize() override;
private:
* called when an instance is created or modified.
*
* @param key in metastore
* @param instance will be added
*/
void OnInstancePut(const std::string &key, const std::shared_ptr<resource_view::InstanceInfo> &instance);
* called when an instance is deleted.
*
* @param key in metastore
* @param instance will be deleted
*/
void OnInstanceDelete(const std::string &key, const std::shared_ptr<resource_view::InstanceInfo> &instance);
* called when instance is created, modified or deleted
*
* @param events created, modified or deleted event list
*/
void OnInstanceWatchEvent(const std::vector<WatchEvent> &events, bool synced);
* called when debug_instance_kv is created, modified or deleted
* @param events created, modified or deleted event list
*/
void OnDebugInstanceWatchEvent(const std::vector<WatchEvent> &events, bool synced);
* called when function meta is created, modified or deleted
*
* @param events created, modified or deleted event list
*/
void OnFuncMetaWatchEvent(const std::vector<WatchEvent> &events, bool synced);
* Get all instance and cache
*
* @param response to get all instance
*/
void GetAndWatchInstance();
void GetAndWatchFunctionMeta();
void OnSyncInstance(const std::shared_ptr<GetResponse> &response);
Status OnSyncNodes(const std::unordered_set<std::string> &nodes);
void GetAndWatchDebugInstance();
void OnSyncDebugInstance(const std::shared_ptr<GetResponse> &response);
void OnInstanceWatch(const std::shared_ptr<Watcher> &watcher);
void GetAndWatchAbnormal();
void OnAbnormalSchedulerWatch(const std::shared_ptr<Watcher> &watcher);
void OnAbnormalSchedulerWatchEvent(const std::vector<WatchEvent> &events, bool synced);
bool CheckKillResult(const OperateResult &result, const std::string &instanceID, const std::string &requestID,
const litebus::AID &from);
void EraseAbnormalScheduler(const std::string &nodeName);
void TryReschedule(const std::string &key, const std::shared_ptr<resource_view::InstanceInfo> &instance,
uint32_t retryTimes);
litebus::Future<Status> KillInstanceWithRetry(const std::string& instanceID,
const std::shared_ptr<internal::ForwardKillRequest> &killReq);
void CompleteKillInstance(const litebus::Future<Status> &status, const std::string &requestID,
const std::string &instanceID);
std::shared_ptr<internal::ForwardKillRequest> MakeKillReq(
const std::shared_ptr<resource_view::InstanceInfo> &instanceInfo, const std::string &srcInstanceID,
int32_t signal, const std::string &msg) const
{
core_service::KillRequest killRequest{};
killRequest.set_signal(signal);
killRequest.set_instanceid(instanceInfo->instanceid());
killRequest.set_payload(msg);
auto forwardKillRequest = std::make_shared<internal::ForwardKillRequest>();
auto requestID = litebus::uuid_generator::UUID::GetRandomUUID().ToString();
killRequest.set_requestid(requestID);
forwardKillRequest->set_requestid(requestID);
forwardKillRequest->set_srcinstanceid(srcInstanceID);
forwardKillRequest->set_instancerequestid(instanceInfo->requestid());
*forwardKillRequest->mutable_req() = std::move(killRequest);
return forwardKillRequest;
}
litebus::Future<Status> KillInstanceWithLocalAddr(const litebus::Option<std::string> &localAddressOpt,
const std::shared_ptr<InstanceInfo> &info,
const std::shared_ptr<internal::ForwardKillRequest> &killReq)
{
if (localAddressOpt.IsNone()) {
return Status(ERR_INNER_SYSTEM_ERROR, fmt::format("failed to get local address({}) of instance({})",
info->functionproxyid(), info->instanceid()));
}
auto localAID =
litebus::AID(info->functionproxyid() + LOCAL_SCHED_INSTANCE_CTRL_ACTOR_NAME_POSTFIX, localAddressOpt.Get());
YRLOG_INFO("{}|send instance({}) kill request to local({}), msg({})", killReq->requestid(), info->instanceid(),
std::string(localAID), killReq->req().payload());
(void)Send(localAID, "ForwardCustomSignalRequest", killReq->SerializeAsString());
return Status::OK();
}
void SetKillRetryInterval(int intervalMs)
{
member_->retryKillIntervalMs = intervalMs;
}
void OnPutAbnormalScheduler(const litebus::Future<std::shared_ptr<PutResponse>> &ret,
const std::shared_ptr<litebus::Promise<Status>> &promise,
const std::string &nodeName);
void OnLocalScheduleChange(const std::vector<WatchEvent> &events);
* Periodically report instance count metrics
*/
void ReportInstanceCountPeriodically();
* Periodically garbage collect FATAL instances that exceed timeout
*/
void GarbageCollectFatalInstances();
void OnLocalScheduleWatch(const std::shared_ptr<Watcher> &watcher);
litebus::Future<SyncResult> InstanceInfoSyncer();
litebus::Future<SyncResult> FunctionMetaSyncer();
void DoFunctionMetaSyncer();
litebus::Future<SyncResult> ProxyAbnormalSyncer(const std::shared_ptr<GetResponse> &getResponse);
litebus::Future<SyncResult> OnInstanceInfoSyncer(const std::shared_ptr<GetResponse> &getResponse);
litebus::Future<SyncResult> OnFunctionMetaSyncer(const std::shared_ptr<GetResponse> &getResponse);
litebus::Future<SyncResult> ReplayFailedInstanceOperation(int64_t revision);
void ReplayFailedPutOperation(std::list<litebus::Future<Status>> &futures,
std::shared_ptr<std::set<std::string>> erasePutKeys);
void ReplayFailedDeleteOperation(std::list<litebus::Future<Status>> &futures,
std::shared_ptr<std::set<std::string>> eraseDelKeys);
Status Execute(std::function<Status()> fn);
void DoTryCancel(const litebus::Future<litebus::Option<NodeInfo>> &future,
const std::shared_ptr<messages::CancelSchedule> &cancelRequest,
const std::shared_ptr<litebus::Promise<Status>> &promise);
void OnKillInstance(const litebus::Future<Status> &status, const messages::ForwardKillRequest &req,
const litebus::AID &from);
void ClearAbnormalSchedulerMetaInfo(const std::string &node);
void ClearAbnormalScheduler(const std::string &node);
bool IsInstanceManagedByJob(const std::shared_ptr<InstanceInfo> &info);
private:
struct Member {
std::shared_ptr<GlobalScheduler> globalScheduler{ nullptr };
std::shared_ptr<MetaStoreClient> client{ nullptr };
std::unordered_set<std::string> proxyRouteSet;
std::shared_ptr<Watcher> proxyRouteWatcher{ nullptr };
std::shared_ptr<InstanceOperator> instanceOpt{ nullptr };
std::shared_ptr<Watcher> abnormalSchedulerWatcher{ nullptr };
bool runtimeRecoverEnable{ false };
std::vector<std::shared_ptr<Watcher>> watchers{ nullptr };
bool enableAbnormalDoubleCheck_{ false };
std::shared_ptr<std::unordered_set<std::string>> abnormalScheduler{ nullptr };
std::unordered_map<std::string, litebus::Timer> abnormalDeferTimer;
std::unordered_map<std::string, InstanceManagerMap> instances;
std::unordered_map<std::string, InstanceKeyInfoPair> instID2Instance;
std::unordered_map<std::string, std::shared_ptr<messages::DebugInstanceInfo>> debugInstInfoMap;
bool isUpgrading{ false };
std::shared_ptr<GroupManager> groupManager{ nullptr };
std::shared_ptr<ResourceGroupManager> resourceGroupManager{ nullptr };
std::shared_ptr<InstanceFamilyCaches> family{ nullptr };
std::set<std::string> exitingInstances;
std::unordered_map<std::string, std::shared_ptr<litebus::Promise<Status>>> killReqPromises;
int retryKillIntervalMs = 15000;
std::unordered_map<std::string, std::unordered_set<std::string>> jobID2InstanceIDs;
std::unordered_map<std::string, std::unordered_set<std::string>> funcMeta2InstanceIDs;
std::shared_ptr<MetaStoreOperateCacher> operateCacher = std::make_shared<MetaStoreOperateCacher>();
bool isMetaStoreEnable{ false };
explorer::LeaderInfo leaderInfo;
std::string servicesPath;
std::string libPath;
std::string functionMetaPath;
std::unordered_set<std::string> innerFuncMetaKeys;
std::shared_ptr<litebus::Promise<messages::QueryInstancesInfoResponse>> queryInstancesPromise;
std::shared_ptr<litebus::Promise<messages::QueryDebugInstanceInfosResponse>> queryDebugInstancesPromise;
std::string systemTenantID{ "0" };
};
class Business : public leader::BusinessPolicy {
public:
Business(const std::shared_ptr<Member> &member, const std::shared_ptr<InstanceManagerActor> &actor)
: member_(member), actor_(actor){};
~Business() override = default;
virtual litebus::Future<Status> OnLocalSchedFault(const std::string &nodeName) = 0;
virtual void OnPutAbnormalScheduler(const litebus::Future<std::shared_ptr<PutResponse>> &ret,
const std::shared_ptr<litebus::Promise<Status>> &promise,
const std::string &nodeName) = 0;
virtual void DelNode(const std::string &nodeName, bool force) = 0;
virtual void AddNode(const std::string &nodeName) = 0;
virtual bool NodeExists(const std::string &nodeName) = 0;
virtual void OnSyncNodes(const std::unordered_set<std::string> &nodes) = 0;
virtual bool IsLocalAbnormal(const std::string &nodeName) = 0;
virtual void OnSyncAbnormalScheduler(const InstanceManagerMap &instances) = 0;
virtual void OnFaultLocalInstancePut(const std::string &key,
const std::shared_ptr<resource_view::InstanceInfo> &instance,
const std::string &reason) = 0;
virtual void ForwardKill(const litebus::AID &from, std::string &&name, std::string &&msg) = 0;
virtual void ForwardCustomSignalResponse(const litebus::AID &from, std::string &&name, std::string &&msg) = 0;
virtual void TryReschedule(const std::string &key, const std::shared_ptr<resource_view::InstanceInfo> &instance,
uint32_t retryTimes) = 0;
virtual void OnInstancePutForFamilyManagement(std::shared_ptr<InstanceInfo> info) = 0;
virtual void OnInstanceDeleteForFamilyManagement(const std::string &instanceKey,
const std::shared_ptr<resource_view::InstanceInfo> &info) = 0;
virtual void OnFuncMetaDelete(const std::string &funcKey) = 0;
virtual litebus::Future<messages::QueryInstancesInfoResponse> QueryInstancesInfo(
std::shared_ptr<messages::QueryInstancesInfoRequest> req) = 0;
virtual litebus::Future<messages::QueryDebugInstanceInfosResponse> QueryDebugInstancesInfo(
std::shared_ptr<messages::QueryDebugInstanceInfosRequest> req) = 0;
virtual void GarbageCollectFatalInstances() = 0;
protected:
std::shared_ptr<Member> member_;
std::weak_ptr<InstanceManagerActor> actor_;
};
class MasterBusiness : public Business {
public:
MasterBusiness(const std::shared_ptr<Member> &member, const std::shared_ptr<InstanceManagerActor> &actor)
: Business(member, actor)
{
}
~MasterBusiness() override = default;
void OnChange() override;
litebus::Future<Status> OnLocalSchedFault(const std::string &nodeName) override;
void OnPutAbnormalScheduler(const litebus::Future<std::shared_ptr<PutResponse>> &ret,
const std::shared_ptr<litebus::Promise<Status>> &promise,
const std::string &nodeName) override;
void DelNode(const std::string &nodeName, bool force) override;
void AddNode(const std::string &nodeName) override;
bool NodeExists(const std::string &nodeName) override;
void ResetNodes();
void OnSyncNodes(const std::unordered_set<std::string> &nodes) override;
bool IsLocalAbnormal(const std::string &nodeName) override;
void OnSyncAbnormalScheduler(const InstanceManagerMap &instances) override;
void OnFaultLocalInstancePut(const std::string &key,
const std::shared_ptr<resource_view::InstanceInfo> &instance,
const std::string &reason) override;
void ForwardKill(const litebus::AID &from, std::string &&name, std::string &&msg) override;
void ForwardCustomSignalResponse(const litebus::AID &from, std::string &&name, std::string &&msg) override;
void TryReschedule(const std::string &key, const std::shared_ptr<resource_view::InstanceInfo> &instance,
uint32_t retryTimes) override;
void ProcessInstanceOnFaultLocal(const std::string &nodeName, const std::string &reason);
void ProcessInstanceNotReSchedule(
const std::pair<const std::string, std::shared_ptr<resource_view::InstanceInfo>> &instance,
const std::string &nodeName, const std::string &reason);
void KillAllInstances(const std::list<std::shared_ptr<InstanceInfo>> &allInstances, int32_t signal,
const std::string &msg);
litebus::Future<Status> KillInstance(const std::shared_ptr<InstanceInfo> &info, int32_t signal,
const std::string &msg);
bool IsInstanceShouldBeKilled(const std::shared_ptr<InstanceInfo>& info);
bool IsAppDriverFinished(const std::shared_ptr<InstanceInfo>& info);
void OnInstancePutForFamilyManagement(std::shared_ptr<InstanceInfo> info) override;
void OnInstanceDeleteForFamilyManagement(const std::string &instanceKey,
const std::shared_ptr<resource_view::InstanceInfo> &info) override;
void OnFuncMetaDelete(const std::string &funcKey) override;
void HandleShutDownAll(const litebus::AID &from, const messages::ForwardKillRequest &forwardKillRequest);
void ForceDelete(const std::string &key, const std::shared_ptr<resource_view::InstanceInfo> &instance);
litebus::Future<messages::QueryInstancesInfoResponse> QueryInstancesInfo(
std::shared_ptr<messages::QueryInstancesInfoRequest> req) override;
litebus::Future<messages::QueryDebugInstanceInfosResponse> QueryDebugInstancesInfo(
std::shared_ptr<messages::QueryDebugInstanceInfosRequest> req) override;
void GarbageCollectFatalInstances() override;
private:
bool nodeSynced_{false};
std::unordered_set<std::string> nodes_;
};
class SlaveBusiness : public Business {
public:
SlaveBusiness(const std::shared_ptr<Member> &member, const std::shared_ptr<InstanceManagerActor> &actor)
: Business(member, actor){};
~SlaveBusiness() override = default;
void OnChange() override;
litebus::Future<Status> OnLocalSchedFault(const std::string &) override;
void OnPutAbnormalScheduler(const litebus::Future<std::shared_ptr<PutResponse>> &,
const std::shared_ptr<litebus::Promise<Status>> &, const std::string &) override;
void DelNode(const std::string &nodeName, bool force) override {};
void AddNode(const std::string &nodeName) override {};
bool NodeExists(const std::string &nodeName) override;
void OnSyncNodes(const std::unordered_set<std::string> &nodes) override {};
bool IsLocalAbnormal(const std::string &) override;
void OnSyncAbnormalScheduler(const InstanceManagerMap &) override;
void OnFaultLocalInstancePut(const std::string &, const std::shared_ptr<resource_view::InstanceInfo> &,
const std::string &) override;
void ForwardKill(const litebus::AID &, std::string &&, std::string &&) override;
void ForwardCustomSignalResponse(const litebus::AID &, std::string &&, std::string &&) override;
void TryReschedule(const std::string &key, const std::shared_ptr<resource_view::InstanceInfo> &instance,
uint32_t retryTimes) override;
void OnInstancePutForFamilyManagement(std::shared_ptr<InstanceInfo> info) override;
void OnInstanceDeleteForFamilyManagement(const std::string &instanceKey,
const std::shared_ptr<resource_view::InstanceInfo> &info) override;
void OnFuncMetaDelete(const std::string &funcKey) override;
litebus::Future<messages::QueryInstancesInfoResponse> QueryInstancesInfo(
std::shared_ptr<messages::QueryInstancesInfoRequest> req) override;
litebus::Future<messages::QueryDebugInstanceInfosResponse> QueryDebugInstancesInfo(
std::shared_ptr<messages::QueryDebugInstanceInfosRequest> req) override;
void GarbageCollectFatalInstances() override {};
};
litebus::Future<Status> CheckSyncResponse(const std::shared_ptr<GetResponse> &response);
void CommitSuicide();
std::shared_ptr<Member> member_{ nullptr };
litebus::AID quotaMgrAID_;
std::unordered_map<std::string, std::shared_ptr<Business>> businesses_;
std::string curStatus_;
std::shared_ptr<Business> business_{ nullptr };
int64_t cancelTimout_;
std::unordered_map<std::string, std::shared_ptr<litebus::Promise<Status>>> cancelPromise_;
bool isSuicide_{ false };
litebus::Promise<bool> isInstancesReady_;
friend class DISABLED_InstanceManagerTest;
};
}
#endif