* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "function_master/scaler/scaler_actor.h"
#include "nlohmann/json.hpp"
#include "async/collect.hpp"
#include "async/defer.hpp"
#include "common/constants/actor_name.h"
#include "common/constants/constants.h"
#include "common/logs/logging.h"
#include "common/metrics/metrics_adapter.h"
#include "common/status/status.h"
#include "common/types/instance_state.h"
#include "common/utils/files.h"
#include "common/utils/meta_store_kv_operation.h"
namespace functionsystem::scaler {
const std::string ORIGINAL_POOL_NAME = "function-agent";
const std::string RUNTIME_MANAGER_CONTAINER_NAME = "runtime-manager";
const std::string FUNCTION_AGENT_CONTAINER_NAME = "function-agent";
const std::string FUNCTION_AGENT_INIT_CONTAINER_NAME = "function-agent-init";
const std::string DS_WORKER_NAME = "ds-worker";
const std::string DELEGATE_CONTAINER_NAME = "runtime";
const std::string LABEL_SCHEDULE_POLICY = "schedule-policy";
const std::string CPU4COMP_ENV = "CPU4COMP";
const std::string MEM4COMP_ENV = "MEM4COMP";
const std::string INIT_LABELS_ENV = "INIT_LABELS";
const std::string TAINT_EFFECT_PREFER_NO_SCHEDULE = "PreferNoSchedule";
const std::string SYSTEM_FUNCTION_SERVICE_ACCOUNT_NAME = "system-function";
const std::string SYSTEM_CLUSTER_CRITICAL_CLASS_NAME = "system-cluster-critical";
const std::string FUNCTION_AGENT_DYNAMIC = "function-agent-dynamic";
const std::string ANNOTATION_KEY_PREFIX = "yr-labels-";
const std::string ANNOTATION_PLACEHOLDER = "yr-default";
const std::string DELEGATE_POD_LABELS = "DELEGATE_POD_LABELS";
const std::string SEMICOLON_SIGN_SPLIT = ";";
const std::string EQUAL_SIGN_SPLIT = "=";
const std::string CPU_BIND_CPU_VALUE = "1000m";
const std::string CPU_BIND_MEM_VALUE = "900Mi";
const std::string EPHEMERAL_STORAGE_RESOURCE = "ephemeral-storage";
const std::string DEFAULT_EPHEMERAL_STORAGE_VALUE = "500Mi";
const int LABEL_ITEM_LENGTH = 2;
const uint32_t WAIT_POD_READY_TIMEOUT = 5 * 60 * 1000;
const uint32_t DEFAULT_EVICT_TIMEOUT = 60;
const uint32_t DEFAULT_SECERT_MODE = 288;
const uint32_t CHECK_SYSTEM_FUNCTION_INTERVAL = 60000;
const std::string CONTAINER_ID_PREFIX = "docker://";
const std::string EVICT_ANNOTATION_KEY = "cluster-autoscaler.kubernetes.io/safe-to-evict";
using namespace functionsystem::explorer;
ScalerActor::ScalerActor(const std::string &name) : ActorBase(name)
{
}
ScalerActor::ScalerActor(const std::string &name, const std::shared_ptr<KubeClient> &kubeClient,
const std::shared_ptr<MetaStorageAccessor> &metaStorageAccessor, const ScalerParams ¶ms)
: ActorBase(name)
{
member_ = std::make_shared<Member>();
member_->kubeClient = kubeClient;
member_->metaStorageAccessor = metaStorageAccessor;
member_->k8sNamespace = params.k8sNamespace;
member_->poolManager = std::make_shared<PoolManager>(metaStorageAccessor);
member_->poolManager->RegisterScaleUpHandler([aid(GetAID())](const std::string &poolID, bool isReserved) {
litebus::Async(aid, &ScalerActor::DoScaleUpPodByPoolID, poolID, isReserved, "", litebus::AID());
});
member_->poolManager->RegisterPersistHandler(
[aid(GetAID())](const std::string &poolID) { litebus::Async(aid, &ScalerActor::PersistencePoolInfo, poolID); });
member_->systemUpgradeParam = params.systemUpgradeParam;
member_->gracePeriodSeconds = params.gracePeriodSeconds;
member_->agentTemplatePath = params.agentTemplatePath;
member_->poolConfigPath = params.poolConfigPath;
if (params.enableFrontendPool) {
std::unordered_map<std::string, std::string> labelSelector = { { REUSE_LABEL_KEY, "true" } };
member_->frontendManager = std::make_shared<SystemFunctionPodManager>(FRONTEND_FUNCTION_NAME, labelSelector);
member_->frontendManager->RegisterScaleUpHandler(params.systemUpgradeParam.handlers.scaleUpSystemFuncHandler);
member_->frontendManager->RegisterScaleDownHandler([aid(GetAID())](const std::string &podName) {
litebus::Async(aid, &ScalerActor::DeletePodByName, podName);
});
}
}
Status ScalerActor::Start()
{
(void)member_->kubeClient->RegisterK8sStatusChangeHandler(
K8sStatusChangeEvent::ON_RECOVER, "FUNCTION_MASTER_SCALER", [this]() { RegisterKubeWatcher(); });
RegisterKubeWatcher();
if (!member_->poolConfigPath.empty() && litebus::os::ExistPath(member_->poolConfigPath)) {
(void)member_->poolManager->LoadPodPoolsConfig(member_->poolConfigPath);
ASSERT_IF_NULL(business_);
(void)business_->CreatePodPools(member_->poolManager->GetLocalPodPools());
}
member_->isSynced = true;
litebus::AsyncAfter(POD_STATUS_CHECK_INTERVAL_MS, GetAID(), &ScalerActor::StartCheckPodStatus);
if (member_->frontendManager != nullptr) {
litebus::AsyncAfter(CHECK_SYSTEM_FUNCTION_INTERVAL, GetAID(), &ScalerActor::StartCheckSystemFunctionNeedScale);
}
return Status::OK();
}
void ScalerActor::RegisterKubeWatcher()
{
WatchHandler podEventHandler = [aid(GetAID())](const K8sEventType &type, const std::shared_ptr<ModelBase> &model) {
litebus::Async(aid, &ScalerActor::HandlePodModifiedEvent, type, model);
};
(void)member_->kubeClient->RegisterWatchHandler("V1Pod", K8sEventType::EVENT_TYPE_MODIFIED,
std::move(podEventHandler));
(void)member_->kubeClient->RegisterWatchHandler("V1Pod", K8sEventType::EVENT_TYPE_DELETE,
std::move(podEventHandler));
(void)member_->kubeClient->ListNamespacedPod(member_->k8sNamespace, true);
WatchHandler nodeEventHandler = [aid(GetAID())](const K8sEventType &type, const std::shared_ptr<ModelBase> &model) {
litebus::Async(aid, &ScalerActor::HandleNodeModifiedEvent, type, model);
};
(void)member_->kubeClient->RegisterWatchHandler("V1Node", K8sEventType::EVENT_TYPE_MODIFIED,
std::move(nodeEventHandler));
(void)member_->kubeClient->RegisterWatchHandler("V1Node", K8sEventType::EVENT_TYPE_DELETE,
std::move(nodeEventHandler));
(void)member_->kubeClient->ListNode(true);
WatchHandler dHandler = [aid(GetAID())](const K8sEventType &type, const std::shared_ptr<ModelBase> &model) {
litebus::Async(aid, &ScalerActor::OnDeploymentModified, type, model);
};
(void)member_->kubeClient->RegisterWatchHandler("V1Deployment", K8sEventType::EVENT_TYPE_MODIFIED, dHandler);
(void)member_->kubeClient->RegisterWatchHandler("V1Deployment", K8sEventType::EVENT_TYPE_DELETE, dHandler);
(void)member_->kubeClient->ListNamespacedDeployment(member_->k8sNamespace, true);
SyncKubeResourceForChange();
}
void ScalerActor::SyncKubeResourceForChange()
{
(void)member_->kubeClient->ListNamespacedPod(member_->k8sNamespace)
.OnComplete([aid(GetAID())](const litebus::Future<std::shared_ptr<V1PodList>> &podList) {
if (podList.IsError()) {
YRLOG_WARN("failed to list existing pods while init, code({})", podList.GetErrorCode());
return;
}
for (const auto &pod : podList.Get()->GetItems()) {
litebus::Async(aid, &ScalerActor::HandlePodModifiedEvent, K8sEventType::EVENT_TYPE_MODIFIED, pod);
}
});
(void)member_->kubeClient->ListNode().OnComplete(
[aid(GetAID())](const litebus::Future<std::shared_ptr<V1NodeList>> &nodeList) {
if (nodeList.IsError()) {
YRLOG_WARN("failed to list existing node while init, code({})", nodeList.GetErrorCode());
return;
}
for (const auto &node : nodeList.Get()->GetItems()) {
litebus::Async(aid, &ScalerActor::HandleNodeModifiedEvent, K8sEventType::EVENT_TYPE_MODIFIED, node);
}
});
(void)member_->kubeClient->ListNamespacedDeployment(member_->k8sNamespace)
.OnComplete([aid(GetAID())](const litebus::Future<std::shared_ptr<V1DeploymentList>> &deploymentList) {
if (deploymentList.IsError()) {
YRLOG_WARN("failed to list existing node while init, code({})", deploymentList.GetErrorCode());
return;
}
for (const auto &deployment : deploymentList.Get()->GetItems()) {
litebus::Async(aid, &ScalerActor::OnDeploymentModified, K8sEventType::EVENT_TYPE_MODIFIED, deployment);
}
});
}
litebus::Future<Status> ScalerActor::SyncNodes()
{
litebus::Promise<Status> promise;
(void)member_->kubeClient->ListNode().OnComplete(
[aid(GetAID()), promise](const litebus::Future<std::shared_ptr<V1NodeList>> &nodeList) {
if (nodeList.IsError()) {
YRLOG_WARN("failed to list existing node while init, code({})", nodeList.GetErrorCode());
promise.SetFailed(static_cast<int32_t>(StatusCode::FAILED));
return;
}
if (nodeList.Get() == nullptr || nodeList.Get()->GetItems().empty()) {
YRLOG_WARN("failed to list existing node, list empty");
promise.SetFailed(static_cast<int32_t>(StatusCode::FAILED));
return;
}
litebus::Async(aid, &ScalerActor::DoSyncNode, nodeList.Get()).OnComplete([promise]() {
promise.SetValue(Status::OK());
});
});
return promise.GetFuture();
}
litebus::Future<std::unordered_map<std::string, std::shared_ptr<V1Node>>> ScalerActor::DoSyncNode(
const std::shared_ptr<V1NodeList> &nodeList)
{
if (nodeList == nullptr) {
YRLOG_WARN("nodeList is nullptr");
return std::unordered_map<std::string, std::shared_ptr<V1Node>>();
}
if (nodeList->GetItems().empty()) {
YRLOG_WARN("list existing nodes is empty, no need to sync");
return std::unordered_map<std::string, std::shared_ptr<V1Node>>();
}
std::unordered_map<std::string, std::shared_ptr<V1Node>> nodes;
for (const auto &node : nodeList->GetItems()) {
if (node == nullptr) {
continue;
}
auto ip = ParseNodeInternalIP(node);
if (ip.empty()) {
continue;
}
YRLOG_INFO("sync node({}) ip({})", node->GetMetadata()->GetName(), ip);
member_->nodes[ip] = node;
nodes[ip] = node;
}
return nodes;
}
litebus::Future<std::shared_ptr<V1DeploymentList>> ScalerActor::ListNamespacedDeployment(const std::string &rNamespace)
{
if (SyncTemplateDeployment().IsError()) {
YRLOG_WARN("sync template deployment failed");
}
return member_->kubeClient->ListNamespacedDeployment(member_->k8sNamespace);
}
litebus::Future<Status> ScalerActor::SyncDeploymentAndPodPool()
{
litebus::Promise<Status> promise;
(void)ListNamespacedDeployment(member_->k8sNamespace)
.OnComplete([aid(GetAID()), promise](const litebus::Future<std::shared_ptr<V1DeploymentList>> &deploymentList) {
if (deploymentList.IsError() || deploymentList.Get()->GetItems().empty()) {
promise.SetFailed(static_cast<int32_t>(StatusCode::FAILED));
return;
}
promise.Associate(litebus::Async(aid, &ScalerActor::SyncDeployments, deploymentList.Get()));
});
return promise.GetFuture().Then(litebus::Defer(GetAID(), &ScalerActor::SyncPodPoolInfo, std::placeholders::_1));
}
litebus::Future<Status> ScalerActor::SyncPodAndInstance()
{
litebus::Promise<Status> promise;
(void)member_->kubeClient->ListNamespacedPod(member_->k8sNamespace)
.OnComplete([aid(GetAID()), promise](const litebus::Future<std::shared_ptr<V1PodList>> &podList) {
if (podList.IsError()) {
YRLOG_WARN("failed to list existing pods while init, code({})", podList.GetErrorCode());
promise.SetFailed(static_cast<int32_t>(StatusCode::FAILED));
return;
}
if (podList.Get()->GetItems().empty()) {
YRLOG_DEBUG("list existing pods is empty, no need to sync");
promise.SetFailed(static_cast<int32_t>(StatusCode::FAILED));
}
promise.Associate(litebus::Async(aid, &ScalerActor::SyncPods, podList.Get()));
});
return promise.GetFuture().Then(litebus::Defer(GetAID(), &ScalerActor::SyncInstances, std::placeholders::_1));
}
litebus::Future<Status> ScalerActor::SyncPods(const std::shared_ptr<V1PodList> &podList)
{
if (podList == nullptr) {
YRLOG_WARN("podList is nullptr");
return Status::OK();
}
std::unordered_map<std::string, std::shared_ptr<V1Pod>> podMap;
for (const auto &pod : podList->GetItems()) {
if (pod == nullptr || pod->GetMetadata() == nullptr || pod->GetStatus() == nullptr) {
continue;
}
auto podName = pod->GetMetadata()->GetName();
if (!litebus::strings::StartsWithPrefix(podName, ORIGINAL_POOL_NAME) &&
!litebus::strings::StartsWithPrefix(podName, CUSTOM_POD_PREFIX + ORIGINAL_POOL_NAME)) {
continue;
}
YRLOG_DEBUG("sync pod(uid={})(name={})(ip={})", pod->GetMetadata()->GetUid(), podName,
pod->GetStatus()->GetPodIP());
podMap[podName] = pod;
member_->poolManager->OnPodUpdate(pod);
if (member_->frontendManager) {
member_->frontendManager->OnPodUpdate(pod);
}
if (!IsPodReady(pod)) {
SyncCreatePodPromise(pod);
}
}
if (!podMap.empty()) {
member_->podNameMap = podMap;
}
return Status::OK();
}
litebus::Future<Status> ScalerActor::SyncDeployments(const std::shared_ptr<V1DeploymentList> &deploymentList)
{
if (deploymentList == nullptr) {
YRLOG_WARN("deploymentList is nullptr");
return Status::OK();
}
for (const auto &deployment : deploymentList->GetItems()) {
if (deployment == nullptr || deployment->GetMetadata() == nullptr || deployment->GetStatus() == nullptr) {
continue;
}
auto name = deployment->GetMetadata()->GetName();
if (!litebus::strings::StartsWithPrefix(name, ORIGINAL_POOL_NAME)) {
continue;
}
member_->poolManager->PutDeployment(deployment);
}
return Status::OK();
}
void ScalerActor::SyncCreatePodPromise(const std::shared_ptr<V1Pod> &pod)
{
auto uid = pod->GetMetadata()->GetUid();
auto name = pod->GetMetadata()->GetName();
if (waitForReadyPods_.find(uid) != waitForReadyPods_.end()) {
return;
}
litebus::Promise<std::shared_ptr<V1Pod>> readyPod;
waitForReadyPods_[uid] = readyPod;
(void)readyPod.GetFuture()
.After(WAIT_POD_READY_TIMEOUT, [uid, name](const litebus::Future<std::shared_ptr<V1Pod>> &readyPod) {
YRLOG_ERROR("sync pod, wait pod({}) name({}) ready timeout", uid, name);
readyPod.SetFailed(static_cast<int32_t>(StatusCode::GS_START_CREATE_POD_FAILED));
return readyPod;
})
.OnComplete([uid, name, aid(GetAID())](const litebus::Future<std::shared_ptr<V1Pod>> &readyPod) {
if (readyPod.IsError()) {
litebus::Async(aid, &ScalerActor::DeleteFailedPod, uid, name);
}
});
}
litebus::Future<Status> ScalerActor::SyncInstances(const Status &status)
{
if (status.IsError()) {
return status;
}
YRLOG_DEBUG("begin to sync instance from etcd");
auto synced = member_->metaStorageAccessor->Sync(INSTANCE_PATH_PREFIX, true);
UpdateInstanceEvent(synced.first);
return Status::OK();
}
litebus::Future<Status> ScalerActor::SyncPodPoolInfo(const Status &status)
{
if (status.IsError()) {
return status;
}
YRLOG_DEBUG("begin to sync pod pool from etcd");
auto synced = member_->metaStorageAccessor->Sync(POD_POOL_PREFIX, true);
UpdatePodPoolEvent(synced.first);
return Status::OK();
}
void ScalerActor::Finalize()
{
if (member_->kubeClient != nullptr) {
member_->kubeClient = nullptr;
}
if (member_->poolManager != nullptr) {
member_->poolManager = nullptr;
}
member_->frontendManager = nullptr;
}
void ScalerActor::InitLeader()
{
auto masterBusiness = std::make_shared<MasterBusiness>(shared_from_this(), member_);
auto slaveBusiness = std::make_shared<SlaveBusiness>(shared_from_this(), member_);
(void)businesses_.emplace(MASTER_BUSINESS, masterBusiness);
(void)businesses_.emplace(SLAVE_BUSINESS, slaveBusiness);
(void)Explorer::GetInstance().AddLeaderChangedCallback(
"ScalerActor", [aid(GetAID())](const LeaderInfo &leaderInfo) {
litebus::Async(aid, &ScalerActor::UpdateLeaderInfo, leaderInfo);
});
curStatus_ = SLAVE_BUSINESS;
business_ = slaveBusiness;
}
void ScalerActor::Init()
{
InitLeader();
ActorBase::Receive("CreateAgent", &ScalerActor::CreateAgent);
ActorBase::Receive("UpdateNodeTaints", &ScalerActor::UpdateNodeTaint);
ActorBase::Receive("DeletePod", &ScalerActor::DeletePod);
ActorBase::Receive("UpdateLocalStatusResponse", &ScalerActor::UpdateLocalStatusResponse);
}
Status ScalerActor::CreateResourcePools(const std::shared_ptr<std::vector<ResourcePool>> &pools)
{
ASSERT_IF_NULL(business_);
return business_->CreateResourcePools(pools);
}
Status ScalerActor::CreateDeployment(const litebus::Future<std::shared_ptr<V1DeploymentList>> &future,
const std::shared_ptr<std::vector<ResourcePool>> &pools)
{
if (!future.IsOK()) {
YRLOG_ERROR("failed to list deployments in default namespace!");
return Status(StatusCode::GS_START_CREATE_DEPLOYMENTS_FAILED, "failed to list deployment");
}
YRLOG_INFO("succeed to list deployments in default namespace!");
if (SyncTemplateDeployment().IsError()) {
YRLOG_ERROR("function-agent deployment was not found");
return Status(StatusCode::GS_START_CREATE_DEPLOYMENTS_FAILED, "failed to find template deployment");
}
const auto &deployments = future.Get();
for (auto &deployment : deployments->GetItems()) {
if (litebus::strings::StartsWithPrefix(deployment.get()->GetMetadata()->GetName(), ORIGINAL_POOL_NAME)) {
member_->poolManager->PutDeployment(deployment);
}
}
for (auto &pool : *pools) {
auto deploymentIter = member_->poolManager->GetDeployment(PoolDeploymentName(pool.name));
if (deploymentIter != nullptr) {
YRLOG_WARN("function-agent pool {}'s deployment already exists.", pool.name);
CheckExistingDeployment(deploymentIter, pool);
continue;
}
pool.initLabels = DEFAULT_INIT_LABELS;
YRLOG_INFO("function-agent pool {} does not exist, creating...", pool.name);
auto newDeploy = ScalerActor::GenerateDeploymentByTemplate(pool, member_->templateDeployment);
(void)member_->kubeClient
->CreateNamespacedDeployment(member_->templateDeployment->GetMetadata()->GetRNamespace(), newDeploy)
.OnComplete(
litebus::Defer(GetAID(), &ScalerActor::OnCreateDeploymentComplete, std::placeholders::_1, pool.name));
}
return Status(StatusCode::SUCCESS);
}
void ScalerActor::CheckExistingDeployment(const std::shared_ptr<V1Deployment> &deployment, const ResourcePool &pool)
{
if (ScalerActor::IsSameDeploymentConfig(pool, deployment)) {
YRLOG_INFO("function-agent pool {}'s deployment config is same as loaded pool, skip.", pool.name);
return;
}
auto newDeployment = ScalerActor::GenerateDeploymentByOrigin(pool, deployment);
auto changedDeploymentJson = newDeployment->ToJson();
nlohmann::json deploymentDiffPatch = nlohmann::json::diff(deployment->ToJson(), changedDeploymentJson);
std::shared_ptr<kube_client::model::Object> body = std::make_shared<kube_client::model::Object>();
bool isSet = body->FromJson(deploymentDiffPatch);
if (!isSet) {
YRLOG_WARN("failed to set object from json patch({}) for pool {}, skip", deploymentDiffPatch.dump(), pool.name);
return;
}
YRLOG_WARN("function-agent pool {}'s deployment config is different with loaded pool, will be updated.", pool.name);
(void)member_->kubeClient
->PatchNamespacedDeployment(newDeployment->GetMetadata()->GetName(),
newDeployment->GetMetadata()->GetRNamespace(), body)
.OnComplete(
litebus::Defer(GetAID(), &ScalerActor::OnPatchDeploymentComplete, std::placeholders::_1, pool.name));
}
litebus::Future<std::shared_ptr<V1Pod>> ScalerActor::CreatePod(const std::string &name,
const std::shared_ptr<V1PodTemplateSpec> &spec,
const std::string &instanceID,
const std::map<std::string, std::string> &labels,
const std::map<std::string, std::string> &annotations)
{
if (member_->templateDeployment == nullptr) {
YRLOG_WARN("template deployment is null, aborted create pod by pool");
litebus::Promise<std::shared_ptr<V1Pod>> promise;
promise.SetFailed(static_cast<int32_t>(StatusCode::FUNC_AGENT_DEPLOYMENT_CONFIG_NOT_FOUND));
return promise.GetFuture();
}
auto pod = std::make_shared<V1Pod>();
pod->SetMetadata(spec->GetMetadata());
pod->GetMetadata()->SetName(name);
pod->GetMetadata()->SetOwnerReferences({});
auto &podLabels = pod->GetMetadata()->GetLabels();
for (auto &label : labels) {
podLabels[label.first] = label.second;
}
podLabels[APP_LABEL_KEY] = FUNCTION_AGENT_DYNAMIC;
if (!pod->GetMetadata()->AnnotationsIsSet()) {
pod->GetMetadata()->SetAnnotations({});
}
auto &podAnnotations = pod->GetMetadata()->GetAnnotations();
for (auto &annotation : annotations) {
podAnnotations[annotation.first] = annotation.second;
}
podAnnotations[ANNOTATION_PLACEHOLDER] = ANNOTATION_PLACEHOLDER;
pod->SetSpec(spec->GetSpec());
pod->UnsetApiVersion();
pod->UnsetKind();
pod->UnsetStatus();
if (auto iter = member_->instanceId2PodName.find(instanceID); iter != member_->instanceId2PodName.end()) {
(void)DoDeletePod(iter->second);
member_->podName2InstanceId.erase(iter->second);
}
member_->instanceId2PodName[instanceID] = pod->GetMetadata()->GetName();
member_->podName2InstanceId[pod->GetMetadata()->GetName()] = instanceID;
return member_->kubeClient->CreateNamespacedPod(member_->templateDeployment->GetMetadata()->GetRNamespace(), pod);
}
bool ScalerActor::IsSameDeploymentConfig(const ResourcePool &pool, const std::shared_ptr<V1Deployment> &deployment)
{
if (deployment->GetSpec()->GetReplicas() != pool.poolSize) {
return false;
}
auto containers = deployment->GetSpec()->GetRTemplate()->GetSpec()->GetContainers();
for (auto &container : containers) {
if (container->GetName() == RUNTIME_MANAGER_CONTAINER_NAME) {
auto requestResources = container->GetResources()->GetRequests();
if (requestResources.find(CPU_RESOURCE) == requestResources.end() ||
pool.requestResources.find(CPU_RESOURCE) == pool.requestResources.end() ||
requestResources[CPU_RESOURCE] != pool.requestResources.at(CPU_RESOURCE)) {
return false;
}
if (requestResources.find(MEMORY_RESOURCE) == requestResources.end() ||
pool.requestResources.find(MEMORY_RESOURCE) == pool.requestResources.end() ||
requestResources[MEMORY_RESOURCE] != pool.requestResources.at(MEMORY_RESOURCE)) {
return false;
}
auto limitResources = container->GetResources()->GetLimits();
if (limitResources.find(MEMORY_RESOURCE) == limitResources.end() ||
pool.limitResources.find(MEMORY_RESOURCE) == pool.limitResources.end() ||
limitResources[MEMORY_RESOURCE] != pool.limitResources.at(MEMORY_RESOURCE)) {
return false;
}
if (limitResources.find(MEMORY_RESOURCE) == limitResources.end() ||
pool.limitResources.find(MEMORY_RESOURCE) == pool.limitResources.end() ||
limitResources[MEMORY_RESOURCE] != pool.limitResources.at(MEMORY_RESOURCE)) {
return false;
}
}
}
return true;
}
void ScalerActor::GetDsWorkerPort(const std::shared_ptr<V1Deployment> &deployment)
{
for (const auto &container : deployment->GetSpec()->GetRTemplate()->GetSpec()->GetContainers()) {
if (container->GetName() != RUNTIME_MANAGER_CONTAINER_NAME) {
continue;
}
for (const auto &env : container->GetEnv()) {
if (env->GetName() == "DS_WORKER_PORT") {
dsWorkPort_ = env->GetValue();
break;
}
}
break;
}
}
void ScalerActor::AddDataSystemIpAndPort(const std::shared_ptr<V1Container> &container)
{
std::vector<std::shared_ptr<V1EnvVar>> &env = container->GetEnv();
auto hostFieldRef = std::make_shared<V1ObjectFieldSelector>();
hostFieldRef->SetApiVersion("v1");
hostFieldRef->SetFieldPath("status.hostIP");
auto hostFrom = std::make_shared<V1EnvVarSource>();
hostFrom->SetFieldRef(hostFieldRef);
auto datasystemHost = std::make_shared<V1EnvVar>();
datasystemHost->SetName("DATASYSTEM_HOST");
datasystemHost->SetValueFrom(hostFrom);
(void)env.emplace_back(datasystemHost);
auto datasystemPort = std::make_shared<V1EnvVar>();
datasystemPort->SetName("DATASYSTEM_PORT");
datasystemPort->SetValue(dsWorkPort_);
(void)env.emplace_back(datasystemPort);
container->SetEnv(env);
}
std::shared_ptr<V1Deployment> ScalerActor::GenerateDeploymentByTemplate(
const ResourcePool &pool, const std::shared_ptr<V1Deployment> &templateDeployment)
{
auto deployment = std::make_shared<V1Deployment>();
if (!deployment->FromJson(templateDeployment->ToJson())) {
YRLOG_WARN("failed to decode from template deployment.");
}
SetMetaData(pool, deployment);
deployment->GetSpec()->SetReplicas(pool.isScalable ? 0 : pool.poolSize);
deployment->GetSpec()->GetSelector()->GetMatchLabels()[APP_LABEL_KEY] = PoolDeploymentName(pool.name);
auto rt = deployment->GetSpec()->GetRTemplate();
SetAffinityForPool(pool, rt);
for (auto &container : rt->GetSpec()->GetContainers()) {
if (container->GetName() == RUNTIME_MANAGER_CONTAINER_NAME) {
if (pool.delegateRuntimeManager.IsSome()) {
auto newImage = pool.delegateRuntimeManager.Get()->GetImage();
if (!newImage.empty()) {
container->SetImage(newImage);
}
auto extraEnv = pool.delegateRuntimeManager.Get()->GetEnv();
for (auto &env : extraEnv) {
container->GetEnv().push_back(env);
}
}
for (const auto &volMount : pool.delegateVolumeMounts) {
container->GetVolumeMounts().push_back(volMount);
}
UpdateContainerResource(container, pool);
SetResourceEnvs(container, pool);
} else if (container->GetName() == FUNCTION_AGENT_CONTAINER_NAME) {
for (const auto &volMount : pool.delegateAgentVolumeMounts) {
container->GetVolumeMounts().push_back(volMount);
}
}
}
AddDelegateConfig(pool, rt);
rt->GetMetadata()->GetLabels().emplace(REUSE_LABEL_KEY, pool.isReuse ? "true" : "false");
for (const auto &label : pool.delegatePoolLabels) {
if (label.first.empty()) {
continue;
}
rt->GetMetadata()->GetLabels().emplace(label.first, label.second);
}
deployment->GetMetadata()->SetOwnerReferences({});
if (pool.podPendingDurationThreshold > 0) {
rt->GetMetadata()->GetAnnotations().emplace(PENDING_THRESHOLD,
std::to_string(pool.podPendingDurationThreshold));
}
return deployment;
}
std::shared_ptr<V1PodTemplateSpec> ScalerActor::GeneratePodTemplateSpec(
const ResourcePool &pool, const std::shared_ptr<V1Deployment> &templateDeployment)
{
auto deployment = std::make_shared<V1Deployment>();
if (!deployment->FromJson(templateDeployment->ToJson())) {
YRLOG_WARN("failed to decode from template deployment.");
}
auto rt = deployment->GetSpec()->GetRTemplate();
for (auto &container: rt->GetSpec()->GetContainers()) {
if (container->GetName() != FUNCTION_AGENT_CONTAINER_NAME) {
continue;
}
for (const auto &mount : pool.delegateAgentVolumeMounts) {
container->GetVolumeMounts().push_back(mount);
}
}
for (auto &container : rt->GetSpec()->GetContainers()) {
if (container->GetName() != RUNTIME_MANAGER_CONTAINER_NAME) {
continue;
}
if (pool.delegateContainer.IsNone()) {
UpdateContainerResource(container, pool);
}
SetResourceEnvs(container, pool);
if (pool.delegateContainer.IsSome() && pool.isNeedBindCPU) {
container->GetResources()->GetLimits()[CPU_RESOURCE] = CPU_BIND_CPU_VALUE;
container->GetResources()->GetLimits()[MEMORY_RESOURCE] = CPU_BIND_MEM_VALUE;
container->GetResources()->GetRequests()[CPU_RESOURCE] = CPU_BIND_CPU_VALUE;
container->GetResources()->GetRequests()[MEMORY_RESOURCE] = CPU_BIND_MEM_VALUE;
}
for (const auto &mount : pool.delegateVolumeMounts) {
container->GetVolumeMounts().push_back(mount);
}
if (pool.delegateRuntimeManager.IsSome()) {
auto newImage = pool.delegateRuntimeManager.Get()->GetImage();
if (!newImage.empty()) {
container->SetImage(newImage);
}
auto currEnv = container->GetEnv();
auto extraEnv = pool.delegateRuntimeManager.Get()->GetEnv();
for (auto &env : extraEnv) {
currEnv.push_back(env);
}
container->SetEnv(currEnv);
}
break;
}
if (pool.delegateContainer.IsSome()) {
auto delegateContainer = pool.delegateContainer.Get();
delegateContainer->SetName(DELEGATE_CONTAINER_NAME);
AddDataSystemIpAndPort(delegateContainer);
for (const auto &delegateDecryptEnv : pool.delegateDecryptEnvs) {
(void)delegateContainer->GetEnv().emplace_back(delegateDecryptEnv);
}
UpdateContainerResource(delegateContainer, pool);
rt->GetSpec()->GetContainers().emplace(rt->GetSpec()->GetContainers().begin(), delegateContainer);
rt->GetSpec()->SetTerminationGracePeriodSeconds(pool.terminationGracePeriodSeconds);
if (delegateContainer->ServiceAccountNameIsSet()) {
rt->GetSpec()->SetServiceAccountName(delegateContainer->GetServiceAccountName());
}
}
AddDelegateConfig(pool, rt);
AddAffinityForPodSpec(pool.affinity, rt, pool.isAggregationNodeAffinity);
if (rt->GetSpec()->SecurityContextIsSet() && pool.delegateSecCompProfile != nullptr) {
rt->GetSpec()->GetSecurityContext()->SetSeccompProfile(pool.delegateSecCompProfile);
}
if (pool.isSystemFunc) {
rt->GetSpec()->SetAutomountServiceAccountToken(true);
rt->GetSpec()->SetServiceAccountName(SYSTEM_FUNCTION_SERVICE_ACCOUNT_NAME);
rt->GetSpec()->SetPriorityClassName(SYSTEM_CLUSTER_CRITICAL_CLASS_NAME);
}
return rt;
}
void ScalerActor::AddDelegateConfig(const ResourcePool &pool, const std::shared_ptr<V1PodTemplateSpec> &rt) const
{
SetHostAliases(rt->GetSpec(), pool.delegateHostAliases);
for (const auto &volume : pool.delegateVolumes) {
rt->GetSpec()->GetVolumes().push_back(volume);
}
for (const auto &sideCar : pool.delegateSidecars) {
rt->GetSpec()->GetContainers().push_back(sideCar);
}
for (const auto &initContainer : rt->GetSpec()->GetInitContainers()) {
if (initContainer->GetName() != FUNCTION_AGENT_INIT_CONTAINER_NAME) {
continue;
}
if (!pool.initImage.empty()) {
initContainer->SetImage(pool.initImage);
}
for (const auto &volumeMount : pool.delegateInitVolumeMounts) {
initContainer->GetVolumeMounts().push_back(volumeMount);
}
if (!initContainer->EnvIsSet()) {
initContainer->SetEnv({});
}
for (const auto &env : pool.delegateInitEnvs) {
initContainer->GetEnv().push_back(env);
}
}
for (const auto &initContainer : pool.delegateInitContainers) {
rt->GetSpec()->GetInitContainers().push_back(initContainer);
}
if (!rt->GetSpec()->TolerationsIsSet()) {
rt->GetSpec()->SetTolerations({});
}
for (const auto &toleration : pool.delegateTolerations) {
rt->GetSpec()->GetTolerations().push_back(toleration);
}
if (!rt->GetSpec()->NodeSelectorIsSet()) {
rt->GetSpec()->SetNodeSelector({});
}
for (const auto &nodeSelector: pool.nodeSelector) {
rt->GetSpec()->GetNodeSelector()[nodeSelector.first] = nodeSelector.second;
}
if (!pool.runtimeClassName.empty()) {
rt->GetSpec()->SetRuntimeClassName(pool.runtimeClassName);
}
if (!rt->GetSpec()->TopologySpreadConstraintsIsSet()) {
rt->GetSpec()->SetTopologySpreadConstraints({});
}
for (const auto &topologySpreadConstraint : pool.delegateTopologySpreadConstraints) {
rt->GetSpec()->GetTopologySpreadConstraints().push_back(topologySpreadConstraint);
}
}
void ScalerActor::SetAffinityForPool(const ResourcePool &pool, const std::shared_ptr<V1PodTemplateSpec> &spec)
{
if (!spec->GetSpec()->AffinityIsSet()) {
spec->GetSpec()->SetAffinity(std::make_shared<V1Affinity>());
}
if (!spec->GetSpec()->GetAffinity()->PodAntiAffinityIsSet()) {
auto label = PoolDeploymentName(pool.name);
std::map<std::string, std::string> labels;
(void)labels.emplace(APP_LABEL_KEY, label);
spec->GetMetadata()->SetLabels(labels);
std::unordered_map<AffinityType, std::vector<std::string>> affinityTypeLabels;
(void)affinityTypeLabels[AffinityType::PreferredAntiAffinity].emplace_back(APP_LABEL_KEY + ":" + label);
auto affinity = ParseAffinity(affinityTypeLabels);
spec->GetSpec()->GetAffinity()->SetPodAntiAffinity(affinity->GetPodAntiAffinity());
}
AddAffinityForPodSpec(pool.affinity, spec, pool.isAggregationNodeAffinity);
}
void ScalerActor::SetMetaData(const ResourcePool &pool, const std::shared_ptr<V1Deployment> &deployment)
{
deployment->GetMetadata()->SetName(PoolDeploymentName(pool.name));
std::map<std::string, std::string> labels;
(void)labels.emplace(APP_LABEL_KEY, PoolDeploymentName(pool.name));
deployment->GetMetadata()->SetLabels(labels);
deployment->GetMetadata()->UnsetCreationTimestamp();
deployment->GetMetadata()->UnsetGeneration();
deployment->GetMetadata()->UnsetResourceVersion();
deployment->GetMetadata()->UnSetSelfLink();
deployment->GetMetadata()->UnsetUid();
}
void ScalerActor::SetResourceEnvs(const std::shared_ptr<V1Container> &container, const ResourcePool &pool)
{
for (auto &env : container->GetEnv()) {
if (env->GetName() == CPU4COMP_ENV && pool.limitResources.find(CPU_RESOURCE) != pool.limitResources.end()) {
auto cpuRes = pool.limitResources.at(CPU_RESOURCE);
auto cpuVal = ParseCPUEnv(cpuRes);
if (!cpuVal.empty()) {
env->SetValue(cpuVal);
}
} else if (env->GetName() == MEM4COMP_ENV &&
pool.limitResources.find(MEMORY_RESOURCE) != pool.limitResources.end()) {
auto memRes = pool.limitResources.at(MEMORY_RESOURCE);
auto memVal = ParseMemoryEnv(memRes);
if (!memVal.empty()) {
env->SetValue(memVal);
}
} else if (env->GetName() == INIT_LABELS_ENV) {
env->SetValue(pool.initLabels);
}
}
}
void ScalerActor::OnCreateDeploymentComplete(const litebus::Future<std::shared_ptr<V1Deployment>> &retDeployment,
const std::string &poolName)
{
if (!retDeployment.IsOK()) {
YRLOG_WARN("failed to create deployment for pool {}, error code: {}", poolName, retDeployment.GetErrorCode());
return;
}
YRLOG_INFO("succeed to create deployment for pool {}", poolName);
auto createdDeployment = retDeployment.Get();
if (createdDeployment != nullptr) {
member_->poolManager->PutDeployment(createdDeployment);
}
DoScaleUpPodByPoolID(poolName, true, "", litebus::AID());
}
void ScalerActor::OnCreatePodPoolComplete(const litebus::Future<std::shared_ptr<V1Deployment>> &retDeployment,
const std::shared_ptr<PodPoolInfo> &poolInfo)
{
if (!retDeployment.IsOK()) {
YRLOG_WARN("failed to create pod pool {}, error code: {}", poolInfo->id, retDeployment.GetErrorCode());
member_->kubeClient->DeleteNamespacedHorizontalPodAutoscaler(POOL_NAME_PREFIX + poolInfo->id + "-hpa",
member_->k8sNamespace, litebus::None(), litebus::None());
poolInfo->status = static_cast<int32_t>(PoolState::FAILED);
poolInfo->readyCount = 0;
poolInfo->msg = "Create Deployment Failed, ErrorCode is " + std::to_string(retDeployment.GetErrorCode());
DoPersistencePoolInfo(poolInfo->id);
return;
}
DoPersistencePoolInfo(poolInfo->id);
YRLOG_INFO("succeed to create pod pool {}", poolInfo->id);
auto createdDeployment = retDeployment.Get();
if (createdDeployment != nullptr) {
member_->poolManager->PutDeployment(createdDeployment);
}
}
void ScalerActor::OnPatchDeploymentComplete(const litebus::Future<std::shared_ptr<V1Deployment>> &retDeployment,
const std::string &poolName)
{
if (!retDeployment.IsOK()) {
YRLOG_WARN("failed to patch deployment for pool {}, error code: {}", poolName, retDeployment.GetErrorCode());
return;
}
YRLOG_INFO("succeed to patch deployment for pool {}", poolName);
auto updatedDeployment = retDeployment.Get();
member_->poolManager->PutDeployment(updatedDeployment);
}
void ScalerActor::OnUpdatePodPoolComplete(const litebus::Future<std::shared_ptr<V1Deployment>> &retDeployment,
const std::shared_ptr<PodPoolInfo> &podPool)
{
if (!retDeployment.IsOK()) {
YRLOG_WARN("failed to update pod pool {}, error code: {}", podPool->id, retDeployment.GetErrorCode());
podPool->msg = "Update Deployment Failed, ErrorCode is " + retDeployment.GetErrorCode();
} else {
YRLOG_INFO("succeed to update pod pool {}", podPool->id);
}
DoPersistencePoolInfo(podPool->id);
}
litebus::Future<Status> ScalerActor::DoDeletePod(const std::string &deletePodName, bool isForce)
{
if (deletePodName.empty()) {
return Status::OK();
}
auto promise = std::make_shared<litebus::Promise<Status>>();
YRLOG_INFO("function-agent pool pod({}) is going to be deleted!", deletePodName);
(void)DoDeleteNamespacedPod(deletePodName, member_->k8sNamespace, GetPodDeleteOptions(deletePodName, isForce),
litebus::Option<bool>(false))
.OnComplete([deletePodName, promise,
poolManager(member_->poolManager)](const litebus::Future<std::shared_ptr<V1Pod>> &retDeletedPod) {
if (!retDeletedPod.IsOK() && retDeletedPod.GetErrorCode() != litebus::http::ResponseCode::NOT_FOUND) {
promise->SetFailed(retDeletedPod.GetErrorCode());
YRLOG_ERROR("failed to delete pod {}, error code: {}", deletePodName, retDeletedPod.GetErrorCode());
return;
}
promise->SetValue(Status::OK());
YRLOG_INFO("succeed to delete pod {}", deletePodName);
if (poolManager != nullptr && retDeletedPod.IsOK()) {
poolManager->OnPodDelete(retDeletedPod.Get(), false);
}
});
return promise->GetFuture();
}
litebus::Future<std::shared_ptr<V1Pod>> ScalerActor::DoDeleteNamespacedPod(
const std::string &podName, const std::string &rNamespace,
const litebus::Option<std::shared_ptr<V1DeleteOptions>> &body, const litebus::Option<bool> &orphanDependents)
{
return member_->kubeClient->DeleteNamespacedPod(podName, rNamespace, body, orphanDependents);
}
litebus::Option<std::shared_ptr<V1DeleteOptions>> ScalerActor::GetPodDeleteOptions(const std::string &podName,
bool isForce)
{
std::shared_ptr<V1DeleteOptions> deleteOptions = std::make_shared<V1DeleteOptions>();
if (isForce) {
deleteOptions->SetGracePeriodSeconds(member_->gracePeriodSeconds);
}
deleteOptions->SetPropagationPolicy("Foreground");
return litebus::Option<std::shared_ptr<V1DeleteOptions>>(deleteOptions);
}
void ScalerActor::OnCreatePodComplete(const litebus::Future<std::shared_ptr<V1Pod>> &retPod,
const std::shared_ptr<messages::CreateAgentRequest> &createAgentRequest,
const litebus::AID &from, bool needWaitPodReady)
{
if (!retPod.IsOK()) {
YRLOG_WARN("{}|failed to create pod for instance({}), error code: {}",
createAgentRequest->instanceinfo().requestid(), createAgentRequest->instanceinfo().instanceid(),
retPod.GetErrorCode());
SendCreateAgentFailedResponse(from, static_cast<int32_t>(StatusCode::GS_START_CREATE_POD_FAILED),
"failed to create pod, error code: " + std::to_string(retPod.GetErrorCode()),
*createAgentRequest);
return;
}
if (!needWaitPodReady) {
YRLOG_INFO("{}|succeed to create pod({}) for instance({})", createAgentRequest->instanceinfo().requestid(),
retPod.Get()->GetMetadata()->GetName(), createAgentRequest->instanceinfo().instanceid());
(void)member_->creatingRequests.erase(createAgentRequest->instanceinfo().requestid());
auto createAgentResponse =
InitCreateAgentResponse(static_cast<int32_t>(StatusCode::SUCCESS), "", *createAgentRequest);
(void)Send(from, "CreateAgentResponse", createAgentResponse.SerializeAsString());
return;
}
YRLOG_INFO("{}|succeed to create pod for instance({}), wait for pod({}) ready",
createAgentRequest->instanceinfo().requestid(), createAgentRequest->instanceinfo().instanceid(),
retPod.Get()->GetMetadata()->GetName());
litebus::Promise<std::shared_ptr<V1Pod>> readyPod;
waitForReadyPods_[retPod.Get()->GetMetadata()->GetUid()] = readyPod;
(void)readyPod.GetFuture()
.After(WAIT_POD_READY_TIMEOUT,
[createAgentRequest, uid(retPod.Get()->GetMetadata()->GetUid())](
const litebus::Future<std::shared_ptr<V1Pod>> &readyPod) {
YRLOG_ERROR("{}|wait pod({}) ready failed, instance({}) timeout",
createAgentRequest->instanceinfo().requestid(), uid,
createAgentRequest->instanceinfo().instanceid());
readyPod.SetFailed(static_cast<int32_t>(StatusCode::GS_START_CREATE_POD_FAILED));
return readyPod;
})
.OnComplete(litebus::Defer(GetAID(), &ScalerActor::HandlerWaitPodReady, std::placeholders::_1,
createAgentRequest, retPod.Get()->GetMetadata()->GetUid(),
retPod.Get()->GetMetadata()->GetName(), from));
}
void ScalerActor::OnScaleUpPodComplete(const litebus::Future<std::shared_ptr<V1Pod>> &retPod,
const std::shared_ptr<V1Pod> &srcPod, const std::string &poolID,
const std::string &requestID, const litebus::AID &from)
{
if (!retPod.IsOK()) {
YRLOG_WARN("{}|failed to scale up pod for pool({}), error code: {}", requestID, poolID, retPod.GetErrorCode());
if (!requestID.empty()) {
(void)member_->creatingRequests.erase(requestID);
(void)Send(
from, "CreateAgentResponse",
GenCreateAgentResponse(requestID, static_cast<int32_t>(StatusCode::GS_START_CREATE_POD_FAILED),
"failed to create pod, error code: " + std::to_string(retPod.GetErrorCode()))
.SerializeAsString());
}
member_->poolManager->OnPodDelete(srcPod);
return;
}
YRLOG_INFO("{}|succeed to scale up pod({}) for pool({})", requestID, srcPod->GetMetadata()->GetName(), poolID);
if (!requestID.empty()) {
litebus::Promise<std::shared_ptr<V1Pod>> readyPod;
waitForReadyPods_[retPod.Get()->GetMetadata()->GetUid()] = std::move(readyPod);
(void)readyPod.GetFuture()
.After(WAIT_POD_READY_TIMEOUT, [requestID, podName(retPod.Get()->GetMetadata()->GetName())](
const litebus::Future<std::shared_ptr<V1Pod>> &readyPod) {
YRLOG_ERROR("{}|wait pod({}) ready failed, timeout", requestID, podName);
readyPod.SetFailed(static_cast<int32_t>(StatusCode::GS_START_CREATE_POD_FAILED));
return readyPod;
})
.OnComplete(litebus::Defer(GetAID(), &ScalerActor::HandlerWaitPoolPodReady, std::placeholders::_1,
requestID, retPod.Get()->GetMetadata()->GetUid(),
retPod.Get()->GetMetadata()->GetName(), from));
}
member_->poolManager->OnPodUpdate(srcPod);
}
void ScalerActor::HandlerWaitPodReady(const litebus::Future<std::shared_ptr<V1Pod>> &readyPod,
const std::shared_ptr<messages::CreateAgentRequest> &createAgentRequest,
const std::string &uid, const std::string &name, const litebus::AID &from)
{
if (readyPod.IsError()) {
YRLOG_ERROR("{}|wait pod({}) ready failed, instance({}), error: {}",
createAgentRequest->instanceinfo().requestid(), uid,
createAgentRequest->instanceinfo().instanceid(), readyPod.GetErrorCode());
SendCreateAgentFailedResponse(
from, static_cast<int32_t>(StatusCode::GS_START_CREATE_POD_FAILED),
"wait for pod(" + uid + ") ready failed, error: " + std::to_string(readyPod.GetErrorCode()),
*createAgentRequest);
DeleteFailedPod(uid, name);
return;
}
auto createAgentResponse =
InitCreateAgentResponse(static_cast<int32_t>(StatusCode::SUCCESS), "", *createAgentRequest);
auto containerStatuses = readyPod.Get()->GetStatus()->GetContainerStatuses();
for (auto containerStatus : containerStatuses) {
if (containerStatus->NameIsSet() && containerStatus->GetName() == DELEGATE_CONTAINER_NAME) {
std::string idWithPrefix = containerStatus->GetContainerID();
auto containerId = idWithPrefix.substr(CONTAINER_ID_PREFIX.size());
YRLOG_DEBUG("{}|success to get delegate container({}) for instance({})",
createAgentRequest->instanceinfo().requestid(), containerId,
createAgentRequest->instanceinfo().instanceid());
(*createAgentResponse.mutable_updatedcreateoptions())[DELEGATE_CONTAINER_ID_KEY] = containerId;
}
}
YRLOG_INFO("{}|succeed to create pod for instance({})", createAgentRequest->instanceinfo().requestid(),
createAgentRequest->instanceinfo().instanceid());
(void)member_->creatingRequests.erase(createAgentRequest->instanceinfo().requestid());
(void)Send(from, "CreateAgentResponse", createAgentResponse.SerializeAsString());
}
void ScalerActor::HandlerWaitPoolPodReady(const litebus::Future<std::shared_ptr<V1Pod>> &readyPod,
const std::string &requestID, const std::string &uid, const std::string &name,
const litebus::AID &from)
{
auto createResp = GenCreateAgentResponse(requestID, static_cast<int32_t>(StatusCode::SUCCESS), "");
if (readyPod.IsError()) {
createResp = GenCreateAgentResponse(requestID, static_cast<int32_t>(StatusCode::GS_START_CREATE_POD_FAILED),
"timeout to scale up pod");
DeleteFailedPod(uid, name);
} else {
YRLOG_INFO("{}|succeed to create pod({}), pod is ready", requestID, name);
}
(void)member_->creatingRequests.erase(requestID);
waitForReadyPods_.erase(uid);
(void)Send(from, "CreateAgentResponse", createResp.SerializeAsString());
}
void ScalerActor::DeleteFailedPod(const std::string &uid, const std::string &podName)
{
YRLOG_INFO("{}|failed to waited pod ready, start to delete pod({})", uid, podName);
(void)DoDeleteNamespacedPod(podName, member_->k8sNamespace, GetPodDeleteOptions(podName))
.OnComplete([podName](const litebus::Future<std::shared_ptr<V1Pod>> &retDeletedPod) {
if (!retDeletedPod.IsOK()) {
YRLOG_ERROR("failed to delete pod({}), error code: {}", podName, retDeletedPod.GetErrorCode());
return;
}
YRLOG_INFO("succeed to delete pod({})", podName);
});
(void)waitForReadyPods_.erase(uid);
}
void ScalerActor::HandlePodModifiedEvent(const K8sEventType &type, const std::shared_ptr<ModelBase> &model)
{
OnPodModified(type, model);
}
void ScalerActor::HandleNodeModifiedEvent(const K8sEventType &type, const std::shared_ptr<ModelBase> &model)
{
OnNodeModified(type, model);
}
void ScalerActor::UpdateMemberOnPodDeleted(const std::shared_ptr<V1Pod> &pod)
{
auto podName = pod->GetMetadata()->GetName();
(void)member_->podName2InstanceId.erase(podName);
(void)member_->podNameMap.erase(podName);
(void)member_->pendingPods.erase(podName);
if (member_->nodeIdPodNameMap.find(pod->GetSpec()->GetNodeName()) != member_->nodeIdPodNameMap.end()) {
(void)member_->nodeIdPodNameMap[pod->GetSpec()->GetNodeName()].erase(podName);
}
if (member_->migratingPodSet.find(podName) != member_->migratingPodSet.end()) {
(void)member_->migratingPodSet.erase(podName);
}
}
void ScalerActor::UpdateMemberOnPodModified(const std::shared_ptr<V1Pod> &pod)
{
auto podName = pod->GetMetadata()->GetName();
member_->podNameMap[podName] = pod;
if (member_->nodeIdPodNameMap.find(pod->GetSpec()->GetNodeName()) != member_->nodeIdPodNameMap.end()) {
(void)member_->nodeIdPodNameMap[pod->GetSpec()->GetNodeName()].emplace(podName);
} else {
member_->nodeIdPodNameMap[pod->GetSpec()->GetNodeName()] = std::set<std::string>{ podName };
}
}
void ScalerActor::HandleReadyPodOnModified(const K8sEventType &type, const std::shared_ptr<V1Pod> &pod)
{
if (!IsPodReady(pod)) {
YRLOG_INFO("receive pod({}) modify event({}), and the pod is not ready", pod->GetMetadata()->GetUid(),
fmt::underlying(type));
return;
}
if (auto iter = waitForReadyPods_.find(pod->GetMetadata()->GetUid()); iter != waitForReadyPods_.end()) {
iter->second.SetValue(pod);
(void)waitForReadyPods_.erase(iter);
}
}
void ScalerActor::OnPodModified(const K8sEventType &type, const std::shared_ptr<ModelBase> &model)
{
auto podPtr = dynamic_cast<V1Pod *>(&(*model));
if (podPtr == nullptr) {
return;
}
auto pod = std::make_shared<V1Pod>(*podPtr);
auto podName = pod->GetMetadata()->GetName();
if (podName.find(DS_WORKER_NAME) != std::string::npos) {
return;
}
if (podName.find(ORIGINAL_POOL_NAME) == std::string::npos) {
return;
}
auto uid = pod->GetMetadata()->GetUid();
YRLOG_INFO("receive pod(uid={})(name={}) modify event({})", uid, pod->GetMetadata()->GetName(),
fmt::underlying(type));
if (type == K8sEventType::EVENT_TYPE_DELETE) {
UpdateMemberOnPodDeleted(pod);
member_->poolManager->OnPodDelete(pod);
if (member_->frontendManager) {
member_->frontendManager->OnPodDelete(pod);
}
return;
}
ASSERT_IF_NULL(business_);
business_->HandleAbnormalPod(pod);
business_->DeletePodNotBindInstance(pod);
business_->UpdatePodLabelsWithoutInstance(pod);
UpdateMemberOnPodModified(pod);
member_->poolManager->OnPodUpdate(pod);
if (member_->frontendManager) {
member_->frontendManager->OnPodUpdate(pod);
}
business_->MigratePodInstanceWithTaints(pod->GetStatus()->GetHostIP(), podName);
HandleReadyPodOnModified(type, pod);
}
void ScalerActor::MigratePodInstanceWithTaints(const std::string &hostIP, const std::string &podName)
{
if (member_->nodes.find(hostIP) == member_->nodes.end()) {
return;
}
auto type = CheckNodeFault(member_->nodes[hostIP]);
if (type == MigrateType::NOT_MIGRATE) {
return;
}
DoPodInstanceMigrate(podName, type);
}
void ScalerActor::DeletePodNotBindInstance(const std::shared_ptr<V1Pod> &pod)
{
if (!CheckPodNeedDelete(pod)) {
return;
}
auto podName = pod->GetMetadata()->GetName();
(void)DoDeleteNamespacedPod(podName, member_->k8sNamespace, GetPodDeleteOptions(podName))
.OnComplete([podName](const litebus::Future<std::shared_ptr<V1Pod>> &retDeletedPod) {
if (!retDeletedPod.IsOK()) {
YRLOG_ERROR("failed to delete pod {}, error code: {}", podName, retDeletedPod.GetErrorCode());
return;
}
YRLOG_INFO("succeed to delete pod {}", podName);
});
}
bool ScalerActor::CheckPodNeedDelete(const std::shared_ptr<V1Pod> &pod)
{
if (auto it = member_->podName2InstanceId.find(pod->GetMetadata()->GetName());
it != member_->podName2InstanceId.end()) {
auto instanceID = it->second;
if (deletingPodSet_.find(instanceID) != deletingPodSet_.end()) {
YRLOG_WARN("pod for instance({}) is deleting, skipped", instanceID);
return false;
}
auto instanceExistIt = member_->instanceId2PodName.find(instanceID);
if (instanceExistIt == member_->instanceId2PodName.end() ||
instanceExistIt->second != pod->GetMetadata()->GetName()) {
YRLOG_INFO("pod(uid={})(name={}) will be deleted, the bind instance({}) not existing anymore",
pod->GetMetadata()->GetUid(), pod->GetMetadata()->GetName(), instanceID);
return true;
}
return false;
}
auto labels = pod->GetMetadata()->GetLabels();
if (auto it = labels.find(APP_LABEL_KEY); it != labels.end()) {
auto label = it->second;
if (label == FUNCTION_AGENT_DYNAMIC) {
YRLOG_INFO("pod(uid={})(name={}) will be deleted, has no bind instance, but bind function-agent({})",
pod->GetMetadata()->GetUid(), pod->GetMetadata()->GetName(), label);
return true;
}
if (!pod->StatusIsSet() || !pod->GetStatus()->PodIPIsSet() || pod->GetStatus()->GetPodIP().empty()) {
return false;
}
if (label == TruncateIllegalLabel(pod->GetMetadata()->GetName())) {
YRLOG_INFO("pod(uid={})(name={}) will be deleted, has no bind instance, but bind function-agent({})",
pod->GetMetadata()->GetUid(), pod->GetMetadata()->GetName(), label);
return true;
}
}
if (auto iter = labels.find(REUSE_LABEL_KEY); iter != labels.end() && iter->second == "true") {
if (auto it = labels.find(LABEL_SCHEDULE_POLICY); it != labels.end() && it->second == MONOPOLY_SCHEDULE) {
YRLOG_DEBUG("pod({}) is reused, but already schedule monopoly instance, need to delete",
pod->GetMetadata()->GetName());
return true;
}
}
return false;
}
void ScalerActor::OnNodeModified(const K8sEventType &type, const std::shared_ptr<ModelBase> &model)
{
auto nodePtr = dynamic_cast<V1Node *>(&(*model));
if (nodePtr == nullptr) {
return;
}
auto node = std::make_shared<V1Node>(*nodePtr);
auto ip = ParseNodeInternalIP(node);
if (ip.empty()) {
return;
}
if (type == K8sEventType::EVENT_TYPE_DELETE) {
YRLOG_INFO("receive node({}) ip({}) delete event", node->GetMetadata()->GetName(), ip);
(void)member_->nodes.erase(ip);
return;
}
YRLOG_INFO("receive node({}) ip({}) update event", node->GetMetadata()->GetName(), ip);
member_->nodes[ip] = node;
ASSERT_IF_NULL(business_);
business_->MigrateNodeInstanceWithTaints(node);
}
void ScalerActor::OnDeploymentModified(const K8sEventType &type, const std::shared_ptr<ModelBase> &model)
{
auto deploymentPtr = dynamic_cast<V1Deployment *>(&(*model));
if (deploymentPtr == nullptr) {
return;
}
auto deployment = std::make_shared<V1Deployment>(*deploymentPtr);
auto name = deployment->GetMetadata()->GetName();
if (!litebus::strings::StartsWithPrefix(name, POOL_NAME_PREFIX)) {
return;
}
YRLOG_INFO("receive deployment(uid={})(name={}) modify event({})", deployment->GetMetadata()->GetUid(),
deployment->GetMetadata()->GetName(), fmt::underlying(type));
if (type == K8sEventType::EVENT_TYPE_DELETE) {
member_->poolManager->DeleteDeployment(deployment->GetMetadata()->GetName());
} else {
member_->poolManager->PutDeployment(deployment);
}
ASSERT_IF_NULL(business_);
auto poolID = name.substr(POOL_NAME_PREFIX.size());
auto poolInfo = member_->poolManager->GetPodPool(poolID);
auto poolNames = member_->poolManager->GetLoadedResourcePoolNames();
if (type != K8sEventType::EVENT_TYPE_DELETE
&& poolInfo == nullptr && poolNames->find(poolID) == poolNames->end()) {
YRLOG_INFO("receive deployment(uid={})(name={}) modify event({}), need to delete deployment",
deployment->GetMetadata()->GetUid(), name, fmt::underlying(type));
auto poolInfoTmp = std::make_shared<PodPoolInfo>();
poolInfoTmp->id = poolID;
business_->DeletePodPool(poolInfoTmp);
return;
}
if (poolInfo == nullptr) {
return;
}
auto labels = deployment->GetSpec()->GetRTemplate()->GetMetadata()->GetLabels();
if (labels.find(poolInfo->group) == labels.end() || labels[poolInfo->group] != poolInfo->group) {
return;
}
auto status = poolInfo->status;
auto readyCount = poolInfo->readyCount;
auto msg = poolInfo->msg;
if (type == K8sEventType::EVENT_TYPE_DELETE) {
if (status == static_cast<int32_t>(PoolState::DELETED)) {
return;
}
status = static_cast<int32_t>(PoolState::FAILED);
msg = "deployment is deleted";
} else {
readyCount = deployment->GetStatus()->GetAvailableReplicas();
if (!poolInfo->scalable && readyCount >= poolInfo->size) {
status = static_cast<int32_t>(PoolState::RUNNING);
msg = "Running";
}
}
if (status != poolInfo->status || readyCount != poolInfo->readyCount) {
YRLOG_INFO("receive deployment update event, pool({}) status from ({}) to ({})", poolInfo->id, poolInfo->status,
status);
poolInfo->status = status;
poolInfo->readyCount = readyCount;
poolInfo->msg = msg;
business_->PersistencePoolInfo(poolInfo->id);
}
DoScaleUpPodByPoolID(poolInfo->id, true, "", litebus::AID());
}
void ScalerActor::MigrateNodeInstanceWithTaints(const std::shared_ptr<V1Node> node)
{
YRLOG_INFO("MigrateNodeInstanceWithTaints node({})", node->GetMetadata()->GetName());
auto type = CheckNodeFault(node);
if (type == MigrateType::NOT_MIGRATE || member_->isUpgrading) {
return;
}
ProcessNodeFault(node->GetMetadata()->GetName(), type);
}
void ScalerActor::UpdateLocalStatusResponse(const litebus::AID &from, std::string && , std::string &&msg)
{
auto response = std::make_shared<messages::UpdateLocalStatusResponse>();
if (!response->ParseFromString(msg)) {
YRLOG_ERROR("received update local status response from {}, invalid msg {}", std::string(from), msg);
return;
}
auto status = response->status();
if (!response->healthy()) {
YRLOG_ERROR("update local status ({}) request to ({}) failed", status, std::string(from));
(void)DoSendUpdateLocalRequest(from, status);
return;
}
}
void ScalerActor::DoSendUpdateLocalRequest(const litebus::AID &from, const uint32_t &status)
{
auto request = std::make_shared<messages::UpdateLocalStatusRequest>();
request->set_status(status);
(void)Send(from, "UpdateLocalStatus", request->SerializeAsString());
}
void ScalerActor::SendUpdateLocalStatusRequest(const std::shared_ptr<V1Node> node, const bool &hasEvictedTaint)
{
member_->hasEvictedTaint = hasEvictedTaint;
std::string nodeId = node->GetMetadata()->GetName();
auto localSchedAid = litebus::AID(nodeId + LOCAL_SCHED_FUNC_AGENT_MGR_ACTOR_NAME_POSTFIX);
auto ip = ParseNodeInternalIP(node);
if (ip.empty()) {
YRLOG_ERROR("SendLocal parse node({}) ip error", nodeId);
}
if (!member_->localSchedPort.empty()) {
localSchedAid.SetUrl(ip + ":" + member_->localSchedPort);
}
auto status = hasEvictedTaint ? LocalStatusCode::EVICTED : LocalStatusCode::SUCCESS;
(void)DoSendUpdateLocalRequest(localSchedAid, status);
}
MigrateType ScalerActor::CheckNodeFault(const std::shared_ptr<V1Node> node)
{
if (!node->GetSpec()->TaintsIsSet() || node->GetSpec()->GetTaints().empty()) {
litebus::Async(GetAID(), &ScalerActor::SendUpdateLocalStatusRequest, node, false);
return MigrateType::NOT_MIGRATE;
}
auto taints = node->GetSpec()->GetTaints();
bool notInTolerations = false;
bool hasWorkerTaint = false;
bool hasProxyTaint = false;
bool hasEvictedTaint = false;
for (auto iter = taints.begin(); iter != taints.end(); ++iter) {
auto taintKey = (*iter)->GetKey();
if (taintKey == member_->workerTaintKey) {
hasWorkerTaint = true;
} else if (taintKey == member_->proxyTaintKey) {
hasProxyTaint = true;
}
if (!member_->evictedTaintKey.empty() && taintKey == member_->evictedTaintKey) {
hasEvictedTaint = true;
litebus::Async(GetAID(), &ScalerActor::SendUpdateLocalStatusRequest, node, true);
}
if (member_->taintToleranceSet.find(taintKey) == member_->taintToleranceSet.end()) {
YRLOG_INFO("node({}) has taint({}), not in tolerance list", node->GetMetadata()->GetName(), taintKey);
notInTolerations = true;
}
}
if (hasEvictedTaint) {
return MigrateType::MIGRATE_EVICTED;
}
if (hasProxyTaint) {
return MigrateType::MIGRATE_ALL;
}
if (hasWorkerTaint) {
return MigrateType::MIGRATE_GRACEFUL;
}
if (!member_->migrateEnable) {
return MigrateType::NOT_MIGRATE;
}
return notInTolerations ? MigrateType::MIGRATE_UNIQUE : MigrateType::NOT_MIGRATE;
}
void ScalerActor::DoEvictedPodInstanceMigrate(const std::string &podName)
{
auto instanceID = member_->podName2InstanceId[podName];
YRLOG_INFO("DoEvictedPodInstanceMigrate pod({}) instanceId({})", podName, instanceID);
DoPodInstanceGracefulMigrate(podName, instanceID);
}
void ScalerActor::DoPodInstanceMigrate(const std::string &podName, const MigrateType type)
{
if (type == MigrateType::MIGRATE_EVICTED) {
DoEvictedPodInstanceMigrate(podName);
return;
}
if (member_->podName2InstanceId.find(podName) == member_->podName2InstanceId.end()) {
return;
}
auto instanceID = member_->podName2InstanceId[podName];
if (type == MigrateType::MIGRATE_UNIQUE &&
member_->needMigratingInstanceIds.find(instanceID) == member_->needMigratingInstanceIds.end()) {
return;
}
if (member_->migratingPodSet.find(podName) != member_->migratingPodSet.end()) {
return;
}
if (CheckPodHasSpecialize(podName)) {
(void)member_->migratingPodSet.emplace(podName);
if (type == MigrateType::MIGRATE_GRACEFUL &&
member_->systemUpgradeParam.handlers.evictAgentHandler != nullptr) {
YRLOG_INFO("process node fault, start to graceful delete pod({}) instance({})", podName, instanceID);
DoPodInstanceGracefulMigrate(podName, instanceID);
return;
}
YRLOG_INFO("process node fault, start to delete pod({}) instance({})", podName, instanceID);
(void)DoDeletePod(podName, false);
}
}
void ScalerActor::DoPodInstanceGracefulMigrate(const std::string &podName, const std::string &instanceID)
{
auto iter = member_->podNameMap.find(podName);
if (iter == member_->podNameMap.end()) {
YRLOG_WARN("failed to migrate pod({}) instance({}), pod not found", podName, instanceID);
return;
}
auto pod = iter->second;
auto labels = pod->GetMetadata()->GetLabels();
auto hostIp = pod->GetStatus()->GetHostIP();
auto proxyID = member_->nodes[hostIp]->GetMetadata()->GetName();
YRLOG_DEBUG("migrate pod({}) instance({}) proxyId({})", podName, instanceID, proxyID);
auto evictReq = std::make_shared<messages::EvictAgentRequest>();
evictReq->set_timeoutsec(UINT32_MAX);
evictReq->set_agentid(podName);
member_->systemUpgradeParam.handlers.evictAgentHandler(proxyID, evictReq)
.OnComplete([aid(GetAID()), pod, instanceID](litebus::Future<Status> f) {
litebus::Async(aid, &ScalerActor::RemoveEvictAnnotation, pod, instanceID);
});
}
void ScalerActor::RemoveEvictAnnotation(const std::shared_ptr<V1Pod> pod, const std::string &instanceID)
{
auto annotations = pod->GetMetadata()->GetAnnotations();
auto iter = annotations.find(EVICT_ANNOTATION_KEY);
if (annotations.empty() || iter == annotations.end()) {
YRLOG_WARN("pod ({}) has no annotation ({})", pod->GetMetadata()->GetName(), EVICT_ANNOTATION_KEY);
return;
}
auto newPod = std::make_shared<V1Pod>();
(void)newPod->FromJson(pod->ToJson());
newPod->GetMetadata()->GetAnnotations().erase(EVICT_ANNOTATION_KEY);
PatchPod(pod->GetMetadata()->GetName(), pod->GetMetadata()->GetRNamespace(), pod, newPod,
instanceID.empty() ? pod->GetMetadata()->GetName() : instanceID);
}
bool ScalerActor::CheckPodHasSpecialize(const std::string &podName)
{
auto iter = member_->podNameMap.find(podName);
if (iter == member_->podNameMap.end()) {
return false;
}
auto pod = iter->second;
if (!pod->StatusIsSet() || !pod->GetStatus()->PodIPIsSet() || pod->GetStatus()->GetPodIP().empty()) {
return false;
}
auto labels = pod->GetMetadata()->GetLabels();
auto it = labels.find(APP_LABEL_KEY);
if (it == labels.end()) {
return false;
}
auto label = it->second;
return it->second == TruncateIllegalLabel(pod->GetMetadata()->GetName());
}
void ScalerActor::ProcessNodeFault(const std::string &nodeName, const MigrateType type)
{
if (member_->nodeIdPodNameMap.find(nodeName) == member_->nodeIdPodNameMap.end()) {
return;
}
if (type == MigrateType::MIGRATE_UNIQUE && member_->needMigratingInstanceIds.empty()) {
return;
}
auto podNames = member_->nodeIdPodNameMap[nodeName];
for (auto name : podNames) {
DoPodInstanceMigrate(name, type);
}
}
std::shared_ptr<V1Deployment> ScalerActor::GenerateDeploymentByOrigin(const ResourcePool &pool,
std::shared_ptr<V1Deployment> originalDeployment)
{
originalDeployment->GetSpec()->SetReplicas(pool.isScalable ? 0 : pool.poolSize);
auto containers = originalDeployment->GetSpec()->GetRTemplate()->GetSpec()->GetContainers();
for (auto &container : containers) {
if (container->GetName() == RUNTIME_MANAGER_CONTAINER_NAME) {
UpdateContainerResource(container, pool);
SetResourceEnvs(container, pool);
}
}
return originalDeployment;
}
void ScalerActor::UpdateContainerResource(const std::shared_ptr<V1Container> &container, const ResourcePool &pool)
{
auto resources = std::make_shared<V1ResourceRequirements>();
std::map<std::string, std::string> requests;
std::map<std::string, std::string> limits;
for (const auto &it : pool.requestResources) {
requests[it.first] = it.second;
if (pool.limitResources.find(it.first) == pool.limitResources.end()) {
limits[it.first] = it.second;
}
}
for (const auto &it : pool.limitResources) {
limits[it.first] = it.second;
}
resources->SetRequests(requests);
resources->SetLimits(limits);
container->SetResources(std::move(resources));
}
messages::CreateAgentResponse ScalerActor::GenCreateAgentResponse(const std::string &requestID, const int32_t code,
const std::string &message)
{
messages::CreateAgentResponse target;
target.set_requestid(requestID);
target.set_code(code);
target.set_message(message);
return target;
}
messages::CreateAgentResponse ScalerActor::InitCreateAgentResponse(int32_t code, const std::string &message,
const messages::CreateAgentRequest &source)
{
auto target = GenCreateAgentResponse(source.instanceinfo().requestid(), code, message);
for (auto &iter : source.instanceinfo().createoptions()) {
(*target.mutable_updatedcreateoptions())[iter.first] = iter.second;
}
return target;
}
void ScalerActor::SendCreateAgentFailedResponse(const litebus::AID &from, int32_t code, const std::string &message,
const messages::CreateAgentRequest &source)
{
(void)member_->creatingRequests.erase(source.instanceinfo().requestid());
(void)Send(from, "CreateAgentResponse", InitCreateAgentResponse(code, message, source).SerializeAsString());
}
std::map<std::string, std::string> ScalerActor::CheckAndBuildResourcesMap(const ::resources::InstanceInfo &instanceInfo)
{
std::map<std::string, std::string> resourcesMap;
const auto &cpuIter = instanceInfo.resources().resources().find(SCHEDULER_CPU_RESOURCE);
const auto &memoryIter = instanceInfo.resources().resources().find(SCHEDULER_MEMORY_RESOURCE);
if (cpuIter == instanceInfo.resources().resources().end() ||
memoryIter == instanceInfo.resources().resources().end()) {
YRLOG_WARN("{}|cpu or memory is none in instance({})", instanceInfo.requestid(), instanceInfo.instanceid());
return resourcesMap;
}
resourcesMap[CPU_RESOURCE] = std::to_string(static_cast<int32_t>(cpuIter->second.scalar().value())) + "m";
resourcesMap[MEMORY_RESOURCE] = std::to_string(static_cast<int32_t>(memoryIter->second.scalar().value())) + "Mi";
const auto &storageIter = instanceInfo.resources().resources().find(EPHEMERAL_STORAGE_RESOURCE);
if (storageIter != instanceInfo.resources().resources().end()) {
resourcesMap[EPHEMERAL_STORAGE_RESOURCE] =
std::to_string(static_cast<int32_t>(storageIter->second.scalar().value())) + "Mi";
}
for (const auto &it : instanceInfo.resources().resources()) {
if (it.first == SCHEDULER_CPU_RESOURCE || it.first == SCHEDULER_MEMORY_RESOURCE ||
it.first == EPHEMERAL_STORAGE_RESOURCE || it.second.type() != resources::Value::SCALAR) {
continue;
}
resourcesMap[it.first] = std::to_string(int32_t(it.second.scalar().value()));
}
return resourcesMap;
}
void ScalerActor::CreateAgent(const litebus::AID &from, std::string &&name, std::string &&msg)
{
ASSERT_IF_NULL(business_);
business_->CreateAgent(from, std::move(name), std::move(msg));
}
void ScalerActor::DeletePod(const litebus::AID &from, std::string &&name, std::string &&msg)
{
ASSERT_IF_NULL(business_);
business_->DeletePod(from, std::move(name), std::move(msg));
}
void ScalerActor::DeletePodByName(const std::string &podName)
{
ASSERT_IF_NULL(business_);
business_->DeletePod(podName);
}
void ScalerActor::UpdateNodeTaint(const litebus::AID &from, std::string &&name, std::string &&msg)
{
ASSERT_IF_NULL(business_);
business_->UpdateNodeTaint(from, std::move(name), std::move(msg));
}
ResourcePool ScalerActor::BuildResourcePool(const ::resources::InstanceInfo &instanceInfo,
const std::map<std::string, std::string> &resourcesMap)
{
ResourcePool pool;
pool.name = GeneratePoolNameByResources(instanceInfo.instanceid(), resourcesMap);
pool.poolSize = 1;
pool.requestResources = resourcesMap;
pool.limitResources = resourcesMap;
if (pool.requestResources.find(EPHEMERAL_STORAGE_RESOURCE) != pool.requestResources.end()) {
pool.requestResources[EPHEMERAL_STORAGE_RESOURCE] = DEFAULT_EPHEMERAL_STORAGE_VALUE;
}
for (const auto &ns : instanceInfo.scheduleoption().nodeselector()) {
pool.nodeSelector[ns.first] = ns.second;
}
if (instanceInfo.has_scheduleoption() && !instanceInfo.scheduleoption().resourceselector().empty()) {
nlohmann::json map;
for (auto &it : instanceInfo.scheduleoption().resourceselector()) {
map[it.first] = it.second;
}
pool.initLabels = map.dump();
}
return pool;
}
void ScalerActor::HandleSharedInstancePut(const resource_view::InstanceInfo &instanceInfo)
{
auto podName = instanceInfo.functionagentid();
if (podName.empty()) {
return;
}
YRLOG_DEBUG("{}|instance({}) is not monopoly schedule policy, but has delegate pod labels",
instanceInfo.requestid(), instanceInfo.instanceid());
if (auto iter = member_->instanceId2PodName.find(instanceInfo.instanceid());
iter == member_->instanceId2PodName.end()) {
YRLOG_DEBUG("{}|shared instance({}) is saved, pod name({})", instanceInfo.requestid(),
instanceInfo.instanceid(), podName);
member_->instanceId2PodName[instanceInfo.instanceid()] = podName;
}
ASSERT_IF_NULL(business_);
business_->UpdatePodLabels(instanceInfo);
}
void ScalerActor::HandleInstancePut(const InstanceInfo &instanceInfo)
{
if (instanceInfo.scheduleoption().schedpolicyname() != MONOPOLY_SCHEDULE) {
HandleSharedInstancePut(instanceInfo);
return;
}
if (auto iter = member_->instanceId2PodName.find(instanceInfo.instanceid());
iter == member_->instanceId2PodName.end()) {
auto podName = instanceInfo.functionagentid();
if (podName != "") {
YRLOG_DEBUG("{}|instance({}) is saved, pod name({})", instanceInfo.requestid(), instanceInfo.instanceid(),
podName);
member_->instanceId2PodName[instanceInfo.instanceid()] = podName;
member_->podName2InstanceId[podName] = instanceInfo.instanceid();
}
}
CheckInstanceNeedMigrate(instanceInfo);
if (IsNeedUpdateLabel(instanceInfo.instancestatus().code())) {
ASSERT_IF_NULL(business_);
business_->UpdatePodLabels(instanceInfo);
return;
} else if (instanceInfo.instancestatus().code() == static_cast<int32_t>(InstanceState::FATAL) ||
instanceInfo.instancestatus().code() == static_cast<int32_t>(InstanceState::SCHEDULE_FAILED) ||
instanceInfo.instancestatus().code() == static_cast<int32_t>(InstanceState::EVICTED)) {
if (deletingPodSet_.find(instanceInfo.instanceid()) != deletingPodSet_.end()) {
YRLOG_DEBUG("{}|pod for instance({}) is deleting, skipped", instanceInfo.requestid(),
instanceInfo.instanceid());
return;
}
if (instanceInfo.functionproxyid() == INSTANCE_MANAGER_OWNER) {
YRLOG_INFO("{}|instance({}) belongs to instance-manager, skip delete pod", instanceInfo.requestid(),
instanceInfo.instanceid());
return;
}
(void)deletingPodSet_.emplace(instanceInfo.instanceid());
std::string podName;
if (auto iter = member_->instanceId2PodName.find(instanceInfo.instanceid());
iter != member_->instanceId2PodName.end()) {
podName = iter->second;
}
ASSERT_IF_NULL(business_);
business_->DeletePod(podName);
return;
} else {
YRLOG_DEBUG("{}|instance({}) status({}) don't need to update agent labels or delete pod",
instanceInfo.requestid(), instanceInfo.instanceid(), instanceInfo.instancestatus().code());
}
}
void ScalerActor::CheckInstanceNeedMigrate(const ::resources::InstanceInfo &instanceInfo)
{
if (!member_->migrateEnable && member_->migrateResourcePrefix.empty()) {
return;
}
if (instanceInfo.instancestatus().code() != static_cast<int32_t>(InstanceState::CREATING) ||
instanceInfo.instancestatus().code() != static_cast<int32_t>(InstanceState::RUNNING)) {
return;
}
auto resource = instanceInfo.resources().resources();
for (auto item : resource) {
if (item.first.find(member_->migrateResourcePrefix) == item.first.npos) {
continue;
}
YRLOG_INFO("instance({}) resource has migrate prefix", instanceInfo.instanceid());
(void)member_->needMigratingInstanceIds.emplace(instanceInfo.instanceid());
}
}
bool ScalerActor::IsNeedUpdateLabel(int32_t code)
{
return code == static_cast<int32_t>(InstanceState::CREATING) ||
code == static_cast<int32_t>(InstanceState::RUNNING) ||
code == static_cast<int32_t>(InstanceState::EXITING) ||
code == static_cast<int32_t>(InstanceState::SUB_HEALTH);
}
void ScalerActor::HandleSharedInstanceDelete(const resource_view::InstanceInfo &instanceInfo)
{
(void)member_->instanceId2PodName.erase(instanceInfo.instanceid());
if (instanceInfo.createoptions().find(DELEGATE_POD_LABELS) == instanceInfo.createoptions().end()) {
return;
}
YRLOG_DEBUG("{}|instance({}) is not monopoly schedule policy, but has delegate pod labels",
instanceInfo.requestid(), instanceInfo.instanceid());
ASSERT_IF_NULL(business_);
business_->UpdatePodLabels(instanceInfo, true);
}
void ScalerActor::HandleInstanceDelete(const InstanceInfo &instanceInfo)
{
if (instanceInfo.scheduleoption().schedpolicyname() != MONOPOLY_SCHEDULE) {
HandleSharedInstanceDelete(instanceInfo);
return;
}
std::string podName = "";
if (auto iter = member_->instanceId2PodName.find(instanceInfo.instanceid());
iter != member_->instanceId2PodName.end()) {
podName = iter->second;
}
(void)member_->instanceId2PodName.erase(instanceInfo.instanceid());
(void)member_->needMigratingInstanceIds.erase(instanceInfo.instanceid());
if (deletingPodSet_.find(instanceInfo.instanceid()) != deletingPodSet_.end()) {
YRLOG_WARN("{}|pod for instance({}) is deleting, skipped", instanceInfo.requestid(), instanceInfo.instanceid());
(void)deletingPodSet_.erase(instanceInfo.instanceid());
return;
}
ASSERT_IF_NULL(business_);
business_->DeletePod(podName);
}
void ScalerActor::UpdatePodLabelsWithoutInstance(const std::shared_ptr<V1Pod> &pod)
{
auto annotations = pod->GetMetadata()->GetAnnotations();
if (annotations.empty()) {
return;
}
pod->GetMetadata()->GetAnnotations()[ANNOTATION_PLACEHOLDER] = ANNOTATION_PLACEHOLDER;
auto newPod = std::make_shared<V1Pod>();
auto originalPodJson = pod->ToJson();
auto podName = pod->GetMetadata()->GetName();
(void)newPod->FromJson(originalPodJson);
bool needUpdate = false;
for (auto &annotation : annotations) {
auto key = annotation.first;
if (!litebus::strings::StartsWithPrefix(key, ANNOTATION_KEY_PREFIX)) {
continue;
}
auto instanceID = key.substr(ANNOTATION_KEY_PREFIX.size());
if (member_->instanceId2PodName.find(instanceID) == member_->instanceId2PodName.end()) {
YRLOG_INFO("instance({}) is not found, start to remove labels for pod({})", instanceID,
pod->GetMetadata()->GetName());
RemovePodLabelsByAnnotations(instanceID, newPod);
needUpdate = true;
}
}
if (!needUpdate) {
return;
}
PatchPod(podName, pod->GetMetadata()->GetRNamespace(), pod, newPod, "");
}
void ScalerActor::UpdatePodLabels(const litebus::Future<std::shared_ptr<V1Pod>> &pod, const InstanceInfo &instanceInfo,
bool isInstanceDelete)
{
std::string podName = instanceInfo.functionagentid();
if (pod.IsError()) {
YRLOG_ERROR("{}|failed to find pod for instance({}), failed to update labels", instanceInfo.requestid(),
instanceInfo.instanceid());
TriggerUpdateLabel(instanceInfo.instanceid(), nullptr);
return;
}
auto latestPod = pod.Get();
if (member_->podNameMap.find(podName) == member_->podNameMap.end()) {
member_->podNameMap[podName] = latestPod;
}
if (!latestPod->GetMetadata()->AnnotationsIsSet()) {
latestPod->GetMetadata()->SetAnnotations({});
}
latestPod->GetMetadata()->GetAnnotations()[ANNOTATION_PLACEHOLDER] = ANNOTATION_PLACEHOLDER;
auto newPod = std::make_shared<V1Pod>();
(void)newPod->FromJson(latestPod->ToJson());
if (!newPod->GetMetadata()->LabelsIsSet()) {
newPod->GetMetadata()->SetLabels({});
}
if (instanceInfo.scheduleoption().schedpolicyname() == MONOPOLY_SCHEDULE) {
AddLabels(instanceInfo, newPod->GetMetadata()->GetLabels());
} else {
HandlePodLabelsWithAnnotation(instanceInfo, newPod, isInstanceDelete);
}
newPod->GetMetadata()->GetAnnotations()[EVICT_ANNOTATION_KEY] = "false";
PatchPod(latestPod->GetMetadata()->GetName(), latestPod->GetMetadata()->GetRNamespace(), latestPod, newPod,
instanceInfo.instanceid());
}
void ScalerActor::PatchPod(const std::string &podName, const std::string &nameSpace,
const std::shared_ptr<V1Pod> &source, const std::shared_ptr<V1Pod> &target,
const std::string &instanceID)
{
PatchPodWithOwnerReference(source, target);
nlohmann::json podDiffPatch = nlohmann::json::diff(source->ToJson(), target->ToJson());
std::shared_ptr<kube_client::model::Object> body = std::make_shared<kube_client::model::Object>();
bool isSet = body->FromJson(podDiffPatch);
if (!isSet) {
YRLOG_WARN("failed to set object json from patch({}) for pod({}), skip", podDiffPatch.dump(), podName);
litebus::Async(GetAID(), &ScalerActor::TriggerUpdateLabel, instanceID, nullptr);
return;
}
YRLOG_INFO("patch labels for pod({})", podName);
(void)member_->kubeClient->PatchNamespacedPod(podName, nameSpace, body)
.OnComplete([podName, instanceID, body, aid(GetAID())](const litebus::Future<std::shared_ptr<V1Pod>> &pod) {
if (pod.IsError()) {
YRLOG_ERROR("failed to patch labels for pod({}), body({}) error code({})", podName,
body->ToJson().dump(), pod.GetErrorCode());
litebus::Async(aid, &ScalerActor::TriggerUpdateLabel, instanceID, nullptr);
return;
}
litebus::Async(aid, &ScalerActor::TriggerUpdateLabel, instanceID, pod.Get());
YRLOG_INFO("success to patch labels for pod({})", podName);
});
}
void ScalerActor::PatchPodWithOwnerReference(const std::shared_ptr<V1Pod> &source, const std::shared_ptr<V1Pod> &target)
{
if (member_->kubeClient->GetOwnerReference() == nullptr ||
member_->kubeClient->GetOwnerReference()->GetUid().empty()) {
return;
}
if (source->GetMetadata()->GetLabels().find(APP_LABEL_KEY) == source->GetMetadata()->GetLabels().end() ||
target->GetMetadata()->GetLabels().find(APP_LABEL_KEY) == target->GetMetadata()->GetLabels().end()) {
return;
}
std::string sourceApp = source->GetMetadata()->GetLabels()[APP_LABEL_KEY];
std::string targetApp = target->GetMetadata()->GetLabels()[APP_LABEL_KEY];
if (sourceApp != targetApp) {
YRLOG_DEBUG("patch pod change pod({}) owner reference", target->GetMetadata()->GetName());
target->GetMetadata()->SetOwnerReferences({ member_->kubeClient->GetOwnerReference() });
}
}
litebus::Future<std::shared_ptr<V1Pod>> ScalerActor::ReadNamespacedPod(const std::string &rNamespace,
const std::string &podName)
{
return member_->kubeClient->ReadNamespacedPod(member_->k8sNamespace, podName);
}
void ScalerActor::UpdatePodLabelsWithLatestPod(const InstanceInfo &instanceInfo, bool isInstanceDelete,
const std::shared_ptr<V1Pod> &pod)
{
if (pod != nullptr) {
UpdatePodLabels(pod, instanceInfo, isInstanceDelete);
return;
}
std::string podName = instanceInfo.functionagentid();
(void)ReadNamespacedPod(member_->k8sNamespace, podName)
.OnComplete(litebus::Defer(GetAID(), &ScalerActor::UpdatePodLabels, std::placeholders::_1, instanceInfo,
isInstanceDelete));
}
Status ScalerActor::Register()
{
auto watchOpt = WatchOption{ true, true };
YRLOG_INFO("register watch {}", POD_POOL_PREFIX);
auto observer = [aid(GetAID())](const std::vector<WatchEvent> &events, bool) {
litebus::Async(aid, &ScalerActor::UpdatePodPoolEvent, events);
return true;
};
auto systemUpgradeSyncer =
[aid(GetAID())](const std::shared_ptr<GetResponse> &getResponse) -> litebus::Future<SyncResult> {
return litebus::Async(aid, &ScalerActor::SystemUpgradeSyncer, getResponse);
};
auto instanceInfoSyncer =
[aid(GetAID())](const std::shared_ptr<GetResponse> &getResponse) -> litebus::Future<SyncResult> {
return litebus::Async(aid, &ScalerActor::InstanceInfoSyncer, getResponse);
};
auto podPoolSyncer =
[aid(GetAID())](const std::shared_ptr<GetResponse> &getResponse) -> litebus::Future<SyncResult> {
return litebus::Async(aid, &ScalerActor::PodPoolSyncer, getResponse);
};
(void)member_->metaStorageAccessor->RegisterObserver(POD_POOL_PREFIX, watchOpt, observer, podPoolSyncer);
if (member_->systemUpgradeParam.isEnabled && member_->systemUpgradeParam.systemUpgradeWatcher != nullptr) {
YRLOG_INFO("register watch upgrade switch, key:{}", member_->systemUpgradeParam.systemUpgradeKey);
(void)member_->systemUpgradeParam.systemUpgradeWatcher->RegisterObserver(
member_->systemUpgradeParam.systemUpgradeKey, watchOpt,
[aid(GetAID())](const std::vector<WatchEvent> &events, bool) {
litebus::Async(aid, &ScalerActor::UpdateSystemUpgradeSwitchEvent, events);
return true;
}, systemUpgradeSyncer);
}
YRLOG_INFO("register watch {} ", INSTANCE_PATH_PREFIX);
(void)member_->metaStorageAccessor->RegisterObserver(
INSTANCE_PATH_PREFIX, watchOpt, [aid(GetAID())](const std::vector<WatchEvent> &events, bool) {
litebus::Async(aid, &ScalerActor::UpdateInstanceEvent, events);
return true;
}, instanceInfoSyncer);
return Status::OK();
}
void ScalerActor::UpdateInstanceEvent(const std::vector<WatchEvent> &events)
{
for (const auto &event : events) {
auto eventKey = TrimKeyPrefix(event.kv.key(), member_->metaStorageAccessor->GetMetaClient()->GetTablePrefix());
auto instanceID = GetInstanceID(eventKey);
switch (event.eventType) {
case EVENT_TYPE_PUT: {
resource_view::InstanceInfo instanceInfo;
if (!TransToInstanceInfoFromJson(instanceInfo, event.kv.value())) {
YRLOG_ERROR("failed to trans to instanceInfo from json string, instanceID: {}", instanceID);
break;
}
if (instanceInfo.lowreliability()) {
break;
}
YRLOG_INFO(
"{}|receive instance put event, instanceID({}), runtime({}), proxy({}), agent({}), parent({}), "
"group({}),status({}), reason({})",
instanceInfo.requestid(), instanceID, instanceInfo.runtimeid(), instanceInfo.functionproxyid(),
instanceInfo.functionagentid(), instanceInfo.parentid(), instanceInfo.groupid(),
instanceInfo.instancestatus().code(), instanceInfo.instancestatus().msg());
HandleInstancePut(instanceInfo);
break;
}
case EVENT_TYPE_DELETE: {
resource_view::InstanceInfo instanceInfo;
if (!TransToInstanceInfoFromJson(instanceInfo, event.prevKv.value())) {
YRLOG_ERROR("failed to trans to instanceInfo from json string, instanceID: {}", instanceID);
break;
}
if (instanceInfo.lowreliability()) {
break;
}
YRLOG_INFO("{}|receive instance delete event, instanceID({})", instanceInfo.requestid(), instanceID);
HandleInstanceDelete(instanceInfo);
break;
}
default: {
YRLOG_WARN("unknown event type {}", fmt::underlying(event.eventType));
}
}
}
}
void ScalerActor::UpdatePodPoolEvent(const std::vector<WatchEvent> &events)
{
for (const auto &event : events) {
auto key = TrimKeyPrefix(event.kv.key(), member_->metaStorageAccessor->GetMetaClient()->GetTablePrefix());
auto poolID = GetPodPoolID(key);
switch (event.eventType) {
case EVENT_TYPE_PUT: {
YRLOG_INFO("receive pod pool info put event, poolID({})", poolID);
auto podPool = member_->poolManager->GetOrNewPool(poolID);
PoolManager::TransToPoolInfoFromJson(event.kv.value(), podPool);
auto status = podPool->status;
ASSERT_IF_NULL(business_);
if (status == static_cast<int32_t>(PoolState::NEW)) {
business_->CreatePodPool(podPool);
} else if (status == static_cast<int32_t>(PoolState::UPDATE)) {
business_->UpdatePodPool(podPool);
} else if (status == static_cast<int32_t>(PoolState::DELETED)) {
business_->DeletePodPool(podPool);
}
break;
}
case EVENT_TYPE_DELETE: {
YRLOG_INFO("receive pod pool info delete event, poolID({})", poolID);
member_->poolManager->DeletePodPoolInfo(poolID);
break;
}
default: {
YRLOG_WARN("unknown event type {}", fmt::underlying(event.eventType));
}
}
}
}
void ScalerActor::UpdateSystemUpgradeSwitchEvent(const std::vector<WatchEvent> &events)
{
for (const auto &event : events) {
auto key = TrimKeyPrefix(event.kv.key(), member_->metaStorageAccessor->GetMetaClient()->GetTablePrefix());
switch (event.eventType) {
case EVENT_TYPE_PUT: {
YRLOG_INFO("receive system upgrade switch put event, key({})", key);
auto upgradeInfo = TransToUpgradeInfoFromJson(event.kv.value());
if (upgradeInfo == nullptr) {
YRLOG_ERROR("failed to trans system upgrade switch info, key: {}", key);
break;
}
HandleSystemUpgrade(upgradeInfo);
if (member_->systemUpgradeParam.handlers.systemUpgradeHandler != nullptr) {
member_->systemUpgradeParam.handlers.systemUpgradeHandler(member_->isUpgrading);
}
break;
}
case EVENT_TYPE_DELETE: {
YRLOG_INFO("receive system upgrade switch delete event, key({})", key);
auto upgradeInfo = TransToUpgradeInfoFromJson(event.prevKv.value());
if (upgradeInfo == nullptr) {
YRLOG_ERROR("failed to trans system upgrade switch info, prevKv: {}", key);
break;
}
if (upgradeInfo->azID != member_->systemUpgradeParam.azID && upgradeInfo->azID != DEFAULT_AZ_ID) {
YRLOG_DEBUG("ignore az({}) system upgrade switch events, we are in az({})", upgradeInfo->azID,
member_->systemUpgradeParam.azID);
return;
}
YRLOG_INFO("key is deleted, system status change to status(2) updated");
member_->isUpgrading = false;
OnSystemUpgradeFinished();
if (member_->systemUpgradeParam.handlers.systemUpgradeHandler != nullptr) {
member_->systemUpgradeParam.handlers.systemUpgradeHandler(member_->isUpgrading);
}
break;
}
default: {
YRLOG_WARN("unknown system upgrade switch event type {}", fmt::underlying(event.eventType));
}
}
}
}
void ScalerActor::HandleSystemUpgrade(std::shared_ptr<UpgradeInfo> upgradeInfo)
{
if (upgradeInfo->azID != member_->systemUpgradeParam.azID && upgradeInfo->azID != DEFAULT_AZ_ID) {
YRLOG_DEBUG("ignore az({}) system upgrade switch events, we are in az({})", upgradeInfo->azID,
member_->systemUpgradeParam.azID);
return;
}
if (upgradeInfo->status == SYSTEM_STATUS_UPGRADING) {
YRLOG_INFO("system status change to status({}) updating", upgradeInfo->status);
member_->isUpgrading = true;
} else if (upgradeInfo->status == SYSTEM_STATUS_UPGRADED) {
YRLOG_INFO("system status change to status({}) updated", upgradeInfo->status);
member_->isUpgrading = false;
OnSystemUpgradeFinished();
} else {
YRLOG_WARN("try to update system upgrade status with unknown status({})", upgradeInfo->status);
}
}
void ScalerActor::OnSystemUpgradeFinished()
{
for (const auto &node : member_->nodes) {
if (!node.second->SpecIsSet() || !node.second->GetSpec()->TaintsIsSet()) {
YRLOG_DEBUG("node({})'s taint is not set, ignore", node.first);
continue;
}
auto taints = node.second->GetSpec()->GetTaints();
for (const auto &taint : taints) {
if (!taint->KeyIsSet()) {
continue;
}
if (taint->GetKey() == member_->proxyTaintKey &&
member_->systemUpgradeParam.handlers.localSchedFaultHandler != nullptr) {
if (member_->systemUpgradeParam.handlers.systemUpgradeHandler != nullptr) {
member_->systemUpgradeParam.handlers.systemUpgradeHandler(member_->isUpgrading);
}
member_->systemUpgradeParam.handlers.localSchedFaultHandler(node.second->GetMetadata()->GetName());
}
if (taint->GetKey() == member_->proxyTaintKey || taint->GetKey() == member_->workerTaintKey ||
(!member_->evictedTaintKey.empty() && taint->GetKey() == member_->evictedTaintKey)) {
YRLOG_INFO("node({}) still is unhealthy after upgrading, taint({})", node.first, taint->GetKey());
MigrateNodeInstanceWithTaints(node.second);
}
}
}
}
void ScalerActor::UpdateLeaderInfo(const explorer::LeaderInfo &leaderInfo)
{
litebus::AID masterAID(SCALER_ACTOR, leaderInfo.address);
auto newStatus = leader::GetStatus(GetAID(), masterAID, curStatus_);
if (businesses_.find(newStatus) == businesses_.end()) {
YRLOG_WARN("new status({}) business don't exist", newStatus);
return;
}
business_ = businesses_[newStatus];
ASSERT_IF_NULL(business_);
business_->OnChange();
curStatus_ = newStatus;
}
void ScalerActor::CreatePodPool(const std::shared_ptr<PodPoolInfo> &podPool)
{
auto resourcePool = member_->poolManager->GenerateResourcePool(podPool);
if (!podPool->horizontalPodAutoscalerSpec.empty()) {
YRLOG_DEBUG("Start to create HPA for pool({})", resourcePool.name);
auto scaler = GenerateV2HorizontalPodAutoscaler(resourcePool, member_->k8sNamespace);
if (scaler == nullptr) {
YRLOG_DEBUG("{}|Generate HPA failed.", podPool->id);
podPool->status = static_cast<int32_t>(PoolState::FAILED);
podPool->msg = "invalid HPA spec, create HPA failed";
litebus::Async(GetAID(), &ScalerActor::DoPersistencePoolInfo, podPool->id);
return;
}
member_->kubeClient->ListNamespacedHorizontalPodAutoscaler(member_->k8sNamespace, false)
.OnComplete([podPool, resourcePool, aid(GetAID())]
(const litebus::Future<std::shared_ptr<V2HorizontalPodAutoscalerList>> &list) {
if (list.IsError()) {
YRLOG_ERROR("{}|CreatePodPool List HPA failed.", podPool->id);
podPool->status = static_cast<int32_t>(PoolState::FAILED);
podPool->msg = "list HPA failed";
litebus::Async(aid, &ScalerActor::DoPersistencePoolInfo, podPool->id);
return;
}
})
.Then(litebus::Defer(GetAID(), &ScalerActor::CreateHPA, podPool, resourcePool, std::placeholders::_1))
.OnComplete([podPool, resourcePool, aid(GetAID())]
(const litebus::Future<std::shared_ptr<V2HorizontalPodAutoscaler>> &scaler) {
if (scaler.IsError()) {
YRLOG_ERROR("{}|Create HPA failed.", podPool->id);
podPool->status = static_cast<int32_t>(PoolState::FAILED);
podPool->msg = "create HPA failed";
litebus::Async(aid, &ScalerActor::DoPersistencePoolInfo, podPool->id);
return;
}
litebus::Async(aid, &ScalerActor::CreatePodPoolWithResourcePool, podPool, resourcePool);
});
return;
}
return CreatePodPoolWithResourcePool(podPool, resourcePool);
}
litebus::Future<std::shared_ptr<V2HorizontalPodAutoscaler>> ScalerActor::CreateHPA(
const std::shared_ptr<PodPoolInfo> &podPool,
const ResourcePool &pool,
const std::shared_ptr<V2HorizontalPodAutoscalerList> &list)
{
auto promise = litebus::Promise<std::shared_ptr<V2HorizontalPodAutoscaler>>();
for (auto item : list->GetItems()) {
if (item->GetMetadata()->GetName() == pool.horizontalPodAutoscaler->GetMetadata()->GetName()) {
YRLOG_ERROR("failed to create hpa({}), HPA exists", item->GetMetadata()->GetName());
promise.SetFailed(static_cast<int32_t>(StatusCode::FAILED));
return promise.GetFuture();
}
}
YRLOG_INFO("start to create hpa {}", pool.horizontalPodAutoscaler->GetMetadata()->GetName());
return member_->kubeClient->CreateNamespacedHorizontalPodAutoscaler(
member_->k8sNamespace, pool.horizontalPodAutoscaler);
}
void ScalerActor::CreatePodPoolWithResourcePool(
const std::shared_ptr<PodPoolInfo> &podPool, const ResourcePool &resourcePool)
{
YRLOG_DEBUG("Start to create pod pool({})", podPool->id);
if (member_->templateDeployment == nullptr) {
YRLOG_ERROR("failed to create pod pool({}), function-agent template not found", podPool->id);
podPool->status = static_cast<int32_t>(PoolState::FAILED);
podPool->msg = "function-agent template not found";
DoPersistencePoolInfo(podPool->id);
return;
}
auto deployment = GenerateDeploymentByTemplate(resourcePool, member_->templateDeployment);
auto name = deployment->GetMetadata()->GetName();
if (member_->poolManager->GetDeployment(name) != nullptr) {
YRLOG_ERROR("failed to create pod pool({}), deployment exists", podPool->id);
podPool->status = static_cast<int32_t>(PoolState::FAILED);
podPool->msg = "pool already exists";
DoPersistencePoolInfo(podPool->id);
return;
}
podPool->status = static_cast<int32_t>(PoolState::CREATING);
podPool->msg = "Creating";
podPool->readyCount = 0;
(void)member_->kubeClient->CreateNamespacedDeployment(member_->k8sNamespace, deployment)
.OnComplete(litebus::Defer(GetAID(), &ScalerActor::OnCreatePodPoolComplete, std::placeholders::_1, podPool));
}
void ScalerActor::UpdatePodPool(const std::shared_ptr<PodPoolInfo> &podPool)
{
auto resourcePool = member_->poolManager->GenerateResourcePool(podPool);
if (!podPool->horizontalPodAutoscalerSpec.empty()) {
YRLOG_DEBUG("Start to update HPA for pool({})", resourcePool.name);
auto scaler = GenerateV2HorizontalPodAutoscaler(resourcePool, member_->k8sNamespace);
if (scaler == nullptr) {
YRLOG_ERROR("{}|Generate HPA failed.", podPool->id);
podPool->status = static_cast<int32_t>(PoolState::FAILED);
podPool->msg = "invalid HPA spec, update HPA failed";
litebus::Async(GetAID(), &ScalerActor::DoPersistencePoolInfo, podPool->id);
return;
}
member_->kubeClient->ListNamespacedHorizontalPodAutoscaler(member_->k8sNamespace, false)
.OnComplete([podPool, resourcePool, aid(GetAID())]
(const litebus::Future<std::shared_ptr<V2HorizontalPodAutoscalerList>> &list) {
if (list.IsError()) {
YRLOG_ERROR("{}|UpdatePodPool List HPA failed.", podPool->id);
podPool->status = static_cast<int32_t>(PoolState::FAILED);
podPool->msg = "list HPA failed";
litebus::Async(aid, &ScalerActor::DoPersistencePoolInfo, podPool->id);
return;
}
})
.Then(litebus::Defer(GetAID(), &ScalerActor::UpdateHPA, podPool, resourcePool, std::placeholders::_1))
.OnComplete([aid(GetAID()), resourcePool, podPool]
(const litebus::Future<std::shared_ptr<V2HorizontalPodAutoscaler>> &scaler) {
if (scaler.IsError()) {
YRLOG_ERROR("{}|UpdatePodPool Update HPA failed.", podPool->id);
podPool->status = static_cast<int32_t>(PoolState::FAILED);
podPool->msg = "update HPA failed";
litebus::Async(aid, &ScalerActor::DoPersistencePoolInfo, podPool->id);
return;
}
litebus::Async(aid, &ScalerActor::UpdatePodPoolWithResourcePool, podPool, resourcePool);
});
return;
}
return UpdatePodPoolWithResourcePool(podPool, resourcePool);
}
litebus::Future<std::shared_ptr<V2HorizontalPodAutoscaler>> ScalerActor::UpdateHPA(
const std::shared_ptr<PodPoolInfo> &podPool,
const ResourcePool &pool,
const std::shared_ptr<V2HorizontalPodAutoscalerList> &list)
{
for (auto item : list->GetItems()) {
if (item->GetMetadata()->GetName() == pool.horizontalPodAutoscaler->GetMetadata()->GetName()) {
YRLOG_INFO("start to modify hpa {}", item->GetMetadata()->GetName());
auto oldItem = item->ToJson();
item->SetSpec(pool.horizontalPodAutoscaler->GetSpec());
auto newItem = item->ToJson();
nlohmann::json diffPatch = nlohmann::json::diff(oldItem, newItem);
std::shared_ptr<kube_client::model::Object> body = std::make_shared<kube_client::model::Object>();
bool isSet = body->FromJson(diffPatch);
if (!isSet) {
YRLOG_WARN("failed to set object from json patch({}) for pool {}, skip",
diffPatch.dump(), item->GetMetadata()->GetName());
auto promise = litebus::Promise<std::shared_ptr<V2HorizontalPodAutoscaler>>();
promise.SetFailed(static_cast<int32_t>(StatusCode::FAILED));
return promise.GetFuture();
}
return member_->kubeClient->PatchNamespacedHorizontalPodAutoscaler(item->GetMetadata()->GetName(),
member_->k8sNamespace, body);
}
}
YRLOG_WARN("update hpa({}), hpa not found, need to create", pool.horizontalPodAutoscaler->GetMetadata()->GetName());
return member_->kubeClient->CreateNamespacedHorizontalPodAutoscaler(
member_->k8sNamespace, pool.horizontalPodAutoscaler);
}
void ScalerActor::UpdatePodPoolWithResourcePool(
const std::shared_ptr<PodPoolInfo> &podPool,
const ResourcePool &resourcePool)
{
YRLOG_DEBUG("Start to update pod pool({})", podPool->id);
auto deployName = POOL_NAME_PREFIX + podPool->id;
auto deployment = member_->poolManager->GetDeployment(deployName);
if (deployment == nullptr) {
YRLOG_WARN("update pod pool({}), pool not found", podPool->id);
podPool->status = static_cast<int32_t>(PoolState::FAILED);
podPool->readyCount = 0;
podPool->msg = "pool not found";
DoPersistencePoolInfo(podPool->id);
return;
}
podPool->status = static_cast<int32_t>(PoolState::UPDATING);
podPool->msg = "Updating";
if (podPool->scalable) {
DoPersistencePoolInfo(podPool->id);
ASSERT_IF_NULL(business_);
business_->ScaleUpPodByPoolID(podPool->id, true, "", litebus::AID());
return;
}
DoUpdatePodPool(podPool, deployment, resourcePool);
}
void ScalerActor::DoUpdatePodPool(const std::shared_ptr<PodPoolInfo> &podPool,
const std::shared_ptr<V1Deployment> &deployment,
const ResourcePool &resourcePool)
{
auto oldDeploymentJson = deployment->ToJson();
auto newDeployment = ScalerActor::GenerateDeploymentByOrigin(resourcePool, deployment);
auto changedDeploymentJson = newDeployment->ToJson();
nlohmann::json deploymentDiffPatch = nlohmann::json::diff(oldDeploymentJson, changedDeploymentJson);
std::shared_ptr<kube_client::model::Object> body = std::make_shared<kube_client::model::Object>();
bool isSet = body->FromJson(deploymentDiffPatch);
if (!isSet) {
YRLOG_WARN("failed to set object from json patch({}) for pool {}, skip", deploymentDiffPatch.dump(),
podPool->id);
return;
}
(void)member_->kubeClient
->PatchNamespacedDeployment(newDeployment->GetMetadata()->GetName(),
newDeployment->GetMetadata()->GetRNamespace(), body)
.OnComplete(litebus::Defer(GetAID(), &ScalerActor::OnUpdatePodPoolComplete, std::placeholders::_1, podPool));
}
void ScalerActor::PersistencePoolInfo(const std::string &poolID)
{
ASSERT_IF_NULL(business_);
business_->PersistencePoolInfo(poolID);
}
void ScalerActor::DoPersistencePoolInfo(const std::string &poolID)
{
member_->poolManager->PersistencePoolInfo(poolID);
}
void ScalerActor::DeletePodPool(const std::shared_ptr<PodPoolInfo> &podPool)
{
auto deployName = POOL_NAME_PREFIX + podPool->id;
auto deployment = member_->poolManager->GetDeployment(deployName);
if (deployment == nullptr) {
YRLOG_WARN("delete pod pool({}), pool not found", podPool->id);
}
(void)member_->kubeClient->DeleteNamespacedDeployment(deployName, member_->k8sNamespace)
.OnComplete(litebus::Defer(GetAID(), &ScalerActor::OnDeletePodPoolComplete, std::placeholders::_1, deployName,
podPool));
}
void ScalerActor::OnDeletePodPoolComplete(const litebus::Future<std::shared_ptr<V1Deployment>> &retDeployment,
const std::string &name, const std::shared_ptr<PodPoolInfo> &podPool)
{
if (!retDeployment.IsOK() && retDeployment.GetErrorCode() != litebus::http::ResponseCode::NOT_FOUND) {
YRLOG_WARN("failed to delete pod pool {}, error code: {}", podPool->id, retDeployment.GetErrorCode());
podPool->status = static_cast<int32_t>(PoolState::FAILED);
podPool->msg = "Delete Deployment Failed, ErrorCode is " + std::to_string(retDeployment.GetErrorCode());
DoPersistencePoolInfo(podPool->id);
return;
}
member_->kubeClient->DeleteNamespacedHorizontalPodAutoscaler(name + "-hpa",
member_->k8sNamespace, litebus::None(), litebus::None());
member_->poolManager->DeleteDeployment(name);
member_->poolManager->DeletePodPoolInfo(podPool->id, true);
}
void ScalerActor::AddVolumesAndMountsForSystemFunc(ResourcePool &pool)
{
nlohmann::json volmuesJson = nlohmann::json::array();
nlohmann::json volumeMountsJson = nlohmann::json::array();
nlohmann::json initVolumeMountsJson = nlohmann::json::array();
YRLOG_INFO("ETCD auth type is {}", member_->etcdAuthType);
if (member_->etcdAuthType == "TLS") {
nlohmann::json etcdVolume;
etcdVolume["name"] = "etcd-client-certs";
etcdVolume["secret"]["secretName"] = member_->etcdSecretName;
etcdVolume["secret"]["defaultMode"] = DEFAULT_SECERT_MODE;
volmuesJson.push_back(etcdVolume);
nlohmann::json etcdVolumeMounts;
etcdVolumeMounts["name"] = "etcd-client-certs";
etcdVolumeMounts["mountPath"] = "/home/snuser/resource/etcd";
volumeMountsJson.push_back(etcdVolumeMounts);
}
ParseVolumesFromJson(volmuesJson.dump(), pool.delegateVolumes);
ParseVolumeMountsFromJson(volumeMountsJson.dump(), pool.delegateVolumeMounts);
ParseVolumeMountsFromJson(initVolumeMountsJson.dump(), pool.delegateInitVolumeMounts);
}
void ScalerActor::ParseInstanceInfoToPool(ResourcePool &pool, const ::resources::InstanceInfo &instanceInfo)
{
ParseDelegateDecryptEnv(instanceInfo, pool.delegateDecryptEnvs);
ParseDelegateContainer(instanceInfo, pool.delegateContainer);
ParseDelegateRuntimeManager(instanceInfo, pool.delegateRuntimeManager);
ParseDelegateVolumes(instanceInfo, pool.delegateVolumes);
ParseDelegateVolumeMounts(instanceInfo, "DELEGATE_VOLUME_MOUNTS", pool.delegateVolumeMounts);
ParseDelegateVolumeMounts(instanceInfo, "DELEGATE_INIT_VOLUME_MOUNTS", pool.delegateInitVolumeMounts);
ParseDelegateVolumeMounts(instanceInfo, "DELEGATE_AGENT_VOLUME_MOUNTS", pool.delegateAgentVolumeMounts);
ParseDelegateHostAliases(instanceInfo, pool.delegateHostAliases);
ParseDelegateSidecars(instanceInfo, pool.delegateSidecars);
ParseDelegateInitContainers(instanceInfo, pool.delegateInitContainers);
ParseDelegateTolerations(instanceInfo, pool.delegateTolerations);
ParseDelegateInitEnv(instanceInfo, pool.delegateInitEnvs);
pool.isSystemFunc = IsSystemFunction(instanceInfo);
if (pool.isSystemFunc) {
AddVolumesAndMountsForSystemFunc(pool);
}
pool.isNeedBindCPU = IsNeedBindCPU(pool.limitResources);
pool.affinity = ParseAffinity(GetAffinityInfo(instanceInfo));
ParseAffinityFromCreateOpts(instanceInfo, pool.affinity);
ParseNodeAffinity(instanceInfo, pool.affinity);
pool.isAggregationNodeAffinity = IsAggregationMergePolicy(instanceInfo);
pool.delegateSecCompProfile = ParseSeccompProfile(instanceInfo);
pool.terminationGracePeriodSeconds = instanceInfo.gracefulshutdowntime();
}
void ScalerActor::DoCreateAgent(ResourcePool &pool, const ::resources::InstanceInfo &instanceInfo,
const std::shared_ptr<messages::CreateAgentRequest> &createAgentRequest,
const litebus::AID &from)
{
ParseInstanceInfoToPool(pool, instanceInfo);
YRLOG_INFO("{}|received a create agent pool({}) from {}.", instanceInfo.requestid(), instanceInfo.instanceid(),
std::string(from));
auto spec = GeneratePodTemplateSpec(pool, member_->templateDeployment);
(void)CreatePod(
GeneratePodName(pool.name, instanceInfo.function(), pool.delegateContainer.IsSome(), pool.isSystemFunc),
spec,
instanceInfo.instanceid(), GetPodLabelsForCreate(instanceInfo), ParseDelegateAnnotations(instanceInfo))
.OnComplete(litebus::Defer(GetAID(), &ScalerActor::OnCreatePodComplete, std::placeholders::_1,
createAgentRequest, from, pool.delegateContainer.IsSome()));
}
void ScalerActor::DoScaleUpPodByPoolID(const std::string &poolID, bool isReserved, const std::string &requestID,
const litebus::AID &from)
{
ASSERT_IF_NULL(business_);
business_->ScaleUpPodByPoolID(poolID, isReserved, requestID, from);
}
void ScalerActor::ScaleUpPodByPoolID(const std::string &poolID, bool isReserved, const std::string &requestID,
const litebus::AID &from)
{
auto pod = member_->poolManager->TryScaleUpPod(poolID, isReserved);
if (pod.IsNone()) {
if (!requestID.empty()) {
(void)member_->creatingRequests.erase(requestID);
(void)Send(from, "CreateAgentResponse",
GenCreateAgentResponse(requestID, static_cast<int32_t>(StatusCode::GS_START_CREATE_POD_FAILED),
"failed to scale up pod")
.SerializeAsString());
}
return;
}
auto targetPod = pod.Get();
YRLOG_INFO("{}|start to scale up a new pod({})", requestID, targetPod->GetMetadata()->GetName());
member_->kubeClient->CreateNamespacedPod(member_->templateDeployment->GetMetadata()->GetRNamespace(), targetPod)
.OnComplete(litebus::Defer(GetAID(), &ScalerActor::OnScaleUpPodComplete, std::placeholders::_1, targetPod,
poolID, requestID, from));
}
litebus::Future<Status> ScalerActor::DoUpdateNodeTaint(const std::shared_ptr<messages::UpdateNodeTaintRequest> &req,
const litebus::AID &from)
{
if (auto iter = updateNodeMap_.find(req->ip()); iter != updateNodeMap_.end()) {
YRLOG_DEBUG("DoUpdateNodeTaint wait: {}", req->ip());
return iter->second.GetFuture().Then(litebus::Defer(GetAID(), &ScalerActor::DoUpdateNodeTaint, req, from));
}
litebus::Promise<bool> promise;
updateNodeMap_[req->ip()] = promise;
YRLOG_DEBUG("DoUpdateNodeTaint: {}", req->ip());
auto taintKey = req->key();
if (req->key().find(FUNCTION_PROXY_TAINT_KEY) != std::string::npos) {
taintKey = member_->proxyTaintKey;
}
(void)HandleNodeTaint(req->ip(), taintKey, !req->healthy()).OnComplete(
litebus::Defer(GetAID(), &ScalerActor::OnUpdateNodeTaintsComplete, std::placeholders::_1, req, from));
return Status::OK();
}
void ScalerActor::OnUpdateNodeTaintsComplete(const litebus::Future<std::shared_ptr<V1Node>> &retNode,
const std::shared_ptr<messages::UpdateNodeTaintRequest> &req,
const litebus::AID &from)
{
if (auto iter = updateNodeMap_.find(req->ip()); iter != updateNodeMap_.end()) {
iter->second.SetValue(true);
updateNodeMap_.erase(req->ip());
}
messages::UpdateNodeTaintResponse rsp;
rsp.set_requestid(req->requestid());
if (retNode.IsError()) {
YRLOG_WARN("{}|failed to update node({}) taints({}) healthy({})", req->requestid(), req->ip(), req->key(),
req->healthy());
rsp.set_code(static_cast<int32_t>(StatusCode::FAILED));
rsp.set_message("error code : " + std::to_string(retNode.GetErrorCode()));
} else {
YRLOG_INFO("{}|update node taints successfully", req->requestid());
rsp.set_code(static_cast<int32_t>(StatusCode::SUCCESS));
}
(void)Send(from, "UpdateNodeTaintsResponse", rsp.SerializeAsString());
}
litebus::Future<std::shared_ptr<V1Node>> ScalerActor::HandleNodeTaint(const std::string &ip, const std::string &key,
const bool isAdd)
{
return member_->kubeClient->ListNode()
.Then(litebus::Defer(GetAID(), &ScalerActor::DoSyncNode, std::placeholders::_1))
.Then(litebus::Defer(GetAID(), &ScalerActor::DoHandleNodeTaint, ip, key, isAdd, std::placeholders::_1));
}
litebus::Future<std::shared_ptr<V1Node>> ScalerActor::DoHandleNodeTaint(
const std::string &ip, const std::string &key, const bool isAdd,
const std::unordered_map<std::string, std::shared_ptr<V1Node>> &nodes)
{
auto iter = nodes.find(ip);
if (iter == nodes.end()) {
return std::make_shared<V1Node>();
}
YRLOG_INFO("try to update taint key({}) to ip({}) isAdd({})", key, ip, isAdd);
auto node = iter->second;
if (key == member_->workerTaintKey && CheckNodeLabels(node)) {
YRLOG_INFO("skip taint key({}) for node({})", key, ip);
return node;
}
auto newNode = std::make_shared<V1Node>();
(void)newNode->FromJson(node->ToJson());
if (!newNode->GetSpec()->TaintsIsSet()) {
newNode->GetSpec()->SetTaints({});
}
auto taints = newNode->GetSpec()->GetTaints();
bool hasKey = false;
for (auto nodeIter = taints.begin(); nodeIter != taints.end(); ++nodeIter) {
if ((*nodeIter)->GetKey() == key) {
YRLOG_INFO("HandleNodeTaint member node({}) has taint key({})", node->GetMetadata()->GetName(), key);
(void)taints.erase(nodeIter);
hasKey = true;
break;
}
}
if (isAdd && hasKey) {
YRLOG_INFO("HandleNodeTaint add taint key and already exist node({})", node->GetMetadata()->GetName());
return std::make_shared<V1Node>();
}
if (!isAdd && !hasKey) {
YRLOG_INFO("HandleNodeTaint del taint key and key not exist node({})", node->GetMetadata()->GetName());
return std::make_shared<V1Node>();
}
if (isAdd) {
YRLOG_INFO("HandleNodeTaint add taint key node({})", node->GetMetadata()->GetName());
auto nodeTaint = std::make_shared<V1Taint>();
nodeTaint->SetKey(key);
nodeTaint->SetValue("true");
nodeTaint->SetEffect(TAINT_EFFECT_PREFER_NO_SCHEDULE);
taints.push_back(nodeTaint);
}
newNode->GetSpec()->SetTaints(taints);
nlohmann::json nodeDiffPatch = nlohmann::json::diff(node->ToJson(), newNode->ToJson());
std::shared_ptr<functionsystem::kube_client::model::Object> body =
std::make_shared<functionsystem::kube_client::model::Object>();
(void)body->FromJson(nodeDiffPatch);
return member_->kubeClient->PatchNode(node->GetMetadata()->GetName(), body);
}
bool ScalerActor::CheckNodeLabels(const std::shared_ptr<V1Node> &node)
{
if (!node->GetMetadata()->LabelsIsSet() || member_->workerTaintExcludeLabels.size() == 0) {
return false;
}
auto nodeLabels = node->GetMetadata()->GetLabels();
for (auto it = member_->workerTaintExcludeLabels.begin(); it != member_->workerTaintExcludeLabels.end(); ++it) {
if (nodeLabels.find(it->first) != nodeLabels.end() && nodeLabels[it->first] == it->second) {
return true;
}
}
return false;
}
void ScalerActor::ParseParams(const functionsystem::functionmaster::Flags &flags)
{
ParseFaultRecoveryParams(flags);
ParseEtcdParams(flags);
ParseK8sClientParams(flags);
std::string prefix = flags.GetSelfTaintPrefix();
prefix = litebus::strings::Trim(prefix);
if (!prefix.empty()) {
member_->workerTaintKey = prefix + DS_WORKER_TAINT_KEY;
member_->proxyTaintKey = prefix + FUNCTION_PROXY_TAINT_KEY;
}
}
void ScalerActor::ParseK8sClientParams(const functionsystem::functionmaster::Flags &flags)
{
member_->kubeClient->SetK8sClientRetryTime(flags.GetKubeClientRetryTimes());
member_->kubeClient->SetK8sClientCycMs(flags.GetKubeClientRetryCycleMs());
}
void ScalerActor::ParseEtcdParams(const functionsystem::functionmaster::Flags &flags)
{
member_->etcdAuthType = flags.GetETCDAuthType();
member_->etcdSecretName = flags.GetEtcdSecretName();
}
void ScalerActor::ParseFaultRecoveryParams(const functionsystem::functionmaster::Flags &flags)
{
member_->evictedTaintKey = flags.GetEvictedTaintKey();
member_->localSchedPort = flags.GetLocalSchedulerPort();
auto migrateEnable = flags.GetMigrateEnable();
member_->migrateEnable = migrateEnable;
if (migrateEnable) {
member_->migrateResourcePrefix = flags.GetMigratePrefix();
}
auto taintToleranceList = flags.GetTaintToleranceList();
if (migrateEnable && !taintToleranceList.empty()) {
auto faults = litebus::strings::Split(taintToleranceList, SEMICOLON_SIGN_SPLIT);
for (auto item : faults) {
member_->taintToleranceSet.emplace(item);
}
}
auto nodeLabels = flags.GetWorkerTaintExcludeLabels();
if (nodeLabels.empty()) {
return;
}
auto labelArr = litebus::strings::Split(nodeLabels, SEMICOLON_SIGN_SPLIT);
for (auto item : labelArr) {
if (item.empty()) {
return;
}
auto label = litebus::strings::Split(item, EQUAL_SIGN_SPLIT);
if (label.size() == LABEL_ITEM_LENGTH) {
member_->workerTaintExcludeLabels.emplace(label[0], label[1]);
}
}
}
void ScalerActor::MasterBusiness::OnChange()
{
if (!member_->isSynced) {
YRLOG_WARN("resource synchronization is not complete");
return;
}
(void)CreatePodPools(member_->poolManager->GetLocalPodPools());
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
actor->SyncPodAndInstance().OnComplete(
[aid(actor->GetAID())]() { litebus::Async(aid, &ScalerActor::SyncKubeResourceForChange); });
}
void ScalerActor::MasterBusiness::UpdatePodLabels(const InstanceInfo &instanceInfo, bool isInstanceDelete)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
YRLOG_INFO("{}|try to update agent labels for instance({})", instanceInfo.requestid(), instanceInfo.instanceid());
actor->AddUpdateLabelReq(instanceInfo, isInstanceDelete);
}
void ScalerActor::MasterBusiness::DeletePod(const std::string &podName)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
actor->DoDeletePod(podName);
}
Status ScalerActor::MasterBusiness::CreateResourcePools(const std::shared_ptr<std::vector<ResourcePool>> &pools)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
(void)actor->ListNamespacedDeployment(member_->k8sNamespace)
.OnComplete(litebus::Defer(actor->GetAID(), &ScalerActor::CreateDeployment, std::placeholders::_1, pools));
return Status(StatusCode::SUCCESS);
}
Status ScalerActor::MasterBusiness::CreatePodPools(
const std::unordered_map<std::string, std::shared_ptr<PodPoolInfo>> &pools)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
if (pools.empty()) {
return Status::OK();
}
for (const auto& podPool : pools) {
actor->CreatePodPool(podPool.second);
}
return Status::OK();
}
void ScalerActor::MasterBusiness::DeletePod(const litebus::AID &from, std::string &&name, std::string &&msg)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
auto deletePodRequest = std::make_shared<messages::DeletePodRequest>();
if (!deletePodRequest->ParseFromString(msg)) {
YRLOG_ERROR("failed to parse request for DeletePod.");
return;
}
YRLOG_INFO("{}|receive delete pod request, agent: {}, msg: {}", deletePodRequest->requestid(),
deletePodRequest->functionagentid(), deletePodRequest->message());
actor->DoDeletePod(deletePodRequest->functionagentid())
.OnComplete(litebus::Defer(actor->GetAID(), &ScalerActor::OnDeletePodComplete, std::placeholders::_1, from,
deletePodRequest->requestid()));
}
void ScalerActor::OnDeletePodComplete(const litebus::Future<Status> &status, const litebus::AID &from,
const std::string &requestID)
{
auto deletePodResponse = std::make_shared<messages::DeletePodResponse>();
deletePodResponse->set_requestid(requestID);
if (status.IsError()) {
deletePodResponse->set_code(status.GetErrorCode());
} else {
deletePodResponse->set_code(0);
}
Send(from, "DeletePodResponse", deletePodResponse->SerializeAsString());
}
void ScalerActor::MasterBusiness::CreateAgent(const litebus::AID &from, std::string &&name, std::string &&msg)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
auto createAgentRequest = std::make_shared<messages::CreateAgentRequest>();
if (!createAgentRequest->ParseFromString(msg)) {
YRLOG_ERROR("failed to parse request for CreateAgent.");
actor->SendCreateAgentFailedResponse(from, static_cast<int32_t>(StatusCode::GS_START_CREATE_POD_FAILED),
"resources or requestID is illegal.", *createAgentRequest);
return;
}
const auto &instanceInfo = createAgentRequest->instanceinfo();
if (!instanceInfo.has_resources() || instanceInfo.requestid().empty()) {
YRLOG_ERROR("instance's resources or requestID is illegal.");
actor->SendCreateAgentFailedResponse(from, static_cast<int32_t>(StatusCode::GS_START_CREATE_POD_FAILED),
"resources or requestID is illegal.", *createAgentRequest);
return;
}
const std::string &requestID = instanceInfo.requestid();
if (member_->creatingRequests.find(requestID) != member_->creatingRequests.end()) {
YRLOG_ERROR("{}|repeated to create agent.", requestID);
return;
}
member_->creatingRequests[requestID] = *createAgentRequest;
if (actor->SyncTemplateDeployment().IsError()) {
YRLOG_ERROR("{}|invalid template deployment in instance({}).", requestID, instanceInfo.instanceid());
actor->SendCreateAgentFailedResponse(from, static_cast<int32_t>(StatusCode::GS_START_CREATE_POD_FAILED),
"invalid template deployment.", *createAgentRequest);
return;
}
if (auto it = instanceInfo.createoptions().find(AFFINITY_POOL_ID);
it != instanceInfo.createoptions().end() && !it->second.empty()) {
actor->ScaleUpPodByPoolID(it->second, false, requestID, from);
return;
}
auto resourcesMap = actor->CheckAndBuildResourcesMap(instanceInfo);
if (resourcesMap.size() == 0) {
actor->SendCreateAgentFailedResponse(from, static_cast<int32_t>(StatusCode::GS_START_CREATE_POD_FAILED),
"resources is illegal.", *createAgentRequest);
return;
}
ResourcePool pool = actor->BuildResourcePool(instanceInfo, resourcesMap);
actor->DoCreateAgent(pool, instanceInfo, createAgentRequest, from);
}
void ScalerActor::MasterBusiness::UpdateNodeTaint(const litebus::AID &from, std::string && ,
std::string &&msg)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
auto req = std::make_shared<messages::UpdateNodeTaintRequest>();
if (!req->ParseFromString(msg)) {
YRLOG_ERROR("failed to parse request for UpdateNodeTaint.");
return;
}
YRLOG_INFO("{}|receive update node({}) taint request, key({}) healthy({})", req->requestid(), req->ip(), req->key(),
req->healthy());
(void)actor->DoUpdateNodeTaint(req, from);
}
void ScalerActor::MasterBusiness::CreatePodPool(const std::shared_ptr<PodPoolInfo> &podPool)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
actor->CreatePodPool(podPool);
}
void ScalerActor::MasterBusiness::UpdatePodPool(const std::shared_ptr<PodPoolInfo> &podPool)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
actor->UpdatePodPool(podPool);
}
void ScalerActor::MasterBusiness::PersistencePoolInfo(const std::string &poolID)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
actor->DoPersistencePoolInfo(poolID);
}
void ScalerActor::MasterBusiness::DeletePodPool(const std::shared_ptr<PodPoolInfo> &podPool)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
actor->DeletePodPool(podPool);
}
void ScalerActor::MasterBusiness::DeletePodNotBindInstance(const std::shared_ptr<V1Pod> &pod)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
actor->DeletePodNotBindInstance(pod);
}
void ScalerActor::MasterBusiness::UpdatePodLabelsWithoutInstance(const std::shared_ptr<V1Pod> &pod)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
actor->UpdatePodLabelsWithoutInstance(pod);
}
void ScalerActor::MasterBusiness::MigratePodInstanceWithTaints(const std::string &hostIP, const std::string &podName)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
actor->MigratePodInstanceWithTaints(hostIP, podName);
}
void ScalerActor::MasterBusiness::MigrateNodeInstanceWithTaints(const std::shared_ptr<V1Node> node)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
actor->MigrateNodeInstanceWithTaints(node);
}
void ScalerActor::MasterBusiness::CheckPodStatus()
{
YRLOG_DEBUG("Start to check pod status");
for (auto it = member_->pendingPods.begin(); it != member_->pendingPods.end();) {
if (member_->podNameMap.find(it->first) == member_->podNameMap.end()) {
it = member_->pendingPods.erase(it);
continue;
}
auto pod = member_->podNameMap.find(it->first)->second;
if (!IsPendingPod(pod)) {
it = member_->pendingPods.erase(it);
continue;
}
int threshold = DEFAULT_POD_PENDING_DURATION_ALARM_THRESHOLD;
auto annotations = pod->GetMetadata()->GetAnnotations();
try {
if (auto annoIt = annotations.find(PENDING_THRESHOLD);
annoIt != annotations.end() && std::stoi(annoIt->second)) {
threshold = std::stoi(annoIt->second);
}
} catch (const std::exception &e) {
YRLOG_WARN("Failed to parse pod pending duration alarm threshold {}, error: {}",
annotations.find(PENDING_THRESHOLD)->second, e.what());
}
if (IsTimeoutPendingPod(pod, it->second, threshold)) {
auto tp = std::chrono::system_clock::to_time_t(it->second);
std::stringstream ss;
ss << std::put_time(std::localtime(&tp), "%Y-%m-%dT%H:%M:%S");
std::string cause = "which has been pending since " + ss.str();
YRLOG_WARN("Function master sends {} pending alarm, {}", pod->GetMetadata()->GetName(), cause);
metrics::MetricsAdapter::GetInstance().SendPodAlarm(pod->GetMetadata()->GetName(), cause);
}
++it;
}
}
void ScalerActor::MasterBusiness::HandleAbnormalPod(const std::shared_ptr<V1Pod> pod)
{
auto podName = pod->GetMetadata()->GetName();
if (IsPendingPod(pod)) {
if (member_->pendingPods.find(podName) == member_->pendingPods.end()) {
member_->pendingPods.emplace(podName, std::chrono::system_clock::now());
}
return;
} else {
member_->pendingPods.erase(podName);
}
if (IsAbnormalPod(pod)) {
auto cause = "whose status is changed to " + pod->GetStatus()->GetPhase();
YRLOG_WARN("Function master sends {} pending alarm, {}", podName, cause);
metrics::MetricsAdapter::GetInstance().SendPodAlarm(podName, cause);
}
}
void ScalerActor::MasterBusiness::ScaleUpPodByPoolID(const std::string &poolID, bool isReserved,
const std::string &requestID, const litebus::AID &from)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
actor->ScaleUpPodByPoolID(poolID, isReserved, requestID, from);
}
void ScalerActor::SlaveBusiness::OnChange()
{
}
void ScalerActor::SlaveBusiness::UpdatePodLabels(const InstanceInfo &, bool)
{
}
void ScalerActor::SlaveBusiness::DeletePod(const std::string &)
{
}
Status ScalerActor::SlaveBusiness::CreateResourcePools(const std::shared_ptr<std::vector<ResourcePool>> &)
{
return Status::OK();
}
Status ScalerActor::SlaveBusiness::CreatePodPools(
const std::unordered_map<std::string, std::shared_ptr<PodPoolInfo>> &pools)
{
return Status::OK();
}
void ScalerActor::SlaveBusiness::CreateAgent(const litebus::AID &from, std::string &&name, std::string &&msg)
{
}
void ScalerActor::SlaveBusiness::DeletePod(const litebus::AID &from, std::string &&name, std::string &&msg)
{
}
void ScalerActor::SlaveBusiness::UpdateNodeTaint(const litebus::AID & , std::string && ,
std::string && )
{
}
void ScalerActor::SlaveBusiness::DeletePodNotBindInstance(const std::shared_ptr<V1Pod> &pod)
{
}
void ScalerActor::SlaveBusiness::UpdatePodLabelsWithoutInstance(const std::shared_ptr<V1Pod> &pod)
{
}
void ScalerActor::SlaveBusiness::MigratePodInstanceWithTaints(const std::string &hostIP, const std::string &podName)
{
}
void ScalerActor::SlaveBusiness::MigrateNodeInstanceWithTaints(const std::shared_ptr<V1Node> node)
{
}
void ScalerActor::SlaveBusiness::CreatePodPool(const std::shared_ptr<PodPoolInfo> &podPool)
{
}
void ScalerActor::SlaveBusiness::UpdatePodPool(const std::shared_ptr<PodPoolInfo> &podPool)
{
}
void ScalerActor::SlaveBusiness::PersistencePoolInfo(const std::string &poolID)
{
}
void ScalerActor::SlaveBusiness::DeletePodPool(const std::shared_ptr<PodPoolInfo> &podPool)
{
}
void ScalerActor::SlaveBusiness::ScaleUpPodByPoolID(const std::string &poolID, bool isReserved,
const std::string &requestID, const litebus::AID &from)
{
}
void ScalerActor::SlaveBusiness::CheckPodStatus()
{
}
void ScalerActor::SlaveBusiness::HandleAbnormalPod(const std::shared_ptr<V1Pod> pod)
{
}
bool ScalerActor::GetIsUpgrading()
{
YRLOG_DEBUG("system upgrading is {}", member_->isUpgrading);
return member_->isUpgrading;
}
void ScalerActor::SetIsUpgrading(bool isUpgrading)
{
YRLOG_DEBUG("set system upgrading to {}", isUpgrading);
member_->isUpgrading = isUpgrading;
}
void ScalerActor::AddUpdateLabelReq(const InstanceInfo &instanceInfo, bool isInstanceDelete)
{
YRLOG_INFO("add update label request for instance({})", instanceInfo.instanceid());
updateLabelMap_[instanceInfo.instanceid()].waitingReq =
std::make_shared<std::pair<resource_view::InstanceInfo, bool>>(instanceInfo, isInstanceDelete);
if (updateLabelMap_[instanceInfo.instanceid()].patchingReq == nullptr) {
YRLOG_INFO("there is no running update label request for instance({}), trigger", instanceInfo.instanceid());
TriggerUpdateLabel(instanceInfo.instanceid(), nullptr);
}
}
void ScalerActor::TriggerUpdateLabel(std::string instanceID, const std::shared_ptr<V1Pod> &pod)
{
updateLabelMap_[instanceID].patchingReq = nullptr;
if (updateLabelMap_[instanceID].waitingReq == nullptr) {
(void)updateLabelMap_.erase(instanceID);
return;
}
YRLOG_INFO("trigger update label for instance({})", instanceID);
updateLabelMap_[instanceID].patchingReq = updateLabelMap_[instanceID].waitingReq;
updateLabelMap_[instanceID].waitingReq = nullptr;
UpdatePodLabelsWithLatestPod(updateLabelMap_[instanceID].patchingReq->first,
updateLabelMap_[instanceID].patchingReq->second, pod);
}
std::shared_ptr<V2HorizontalPodAutoscaler> ScalerActor::GenerateV2HorizontalPodAutoscaler(
const ResourcePool &pool, const std::string &rNamespace)
{
if (pool.horizontalPodAutoscaler == nullptr) {
return nullptr;
}
const char* env = std::getenv("K8S_API_VERSION");
std::string version = (env && std::string(env) == "v2") ? "v2" : "v2beta2";
pool.horizontalPodAutoscaler->SetApiVersion("autoscaling/" + version);
pool.horizontalPodAutoscaler->SetKind("HorizontalPodAutoscaler");
auto metaData = std::make_shared<V1ObjectMeta>();
metaData->SetName(PoolDeploymentName(pool.name) + "-hpa");
metaData->SetRNamespace(rNamespace);
pool.horizontalPodAutoscaler->SetMetadata(metaData);
auto ref = std::make_shared<V2CrossVersionObjectReference>();
ref->SetApiVersion("apps/v1");
ref->SetKind("Deployment");
ref->SetName(PoolDeploymentName(pool.name));
pool.horizontalPodAutoscaler->GetSpec()->SetScaleTargetRef(ref);
return pool.horizontalPodAutoscaler;
}
litebus::Future<SyncResult> ScalerActor::SystemUpgradeSyncer(const std::shared_ptr<GetResponse> &getResponse)
{
auto prefixKey = member_->systemUpgradeParam.systemUpgradeKey;
if (getResponse->status.IsError()) {
YRLOG_INFO("failed to get key({}) from meta storage", prefixKey);
return SyncResult{ getResponse->status };
}
if (getResponse->kvs.empty()) {
YRLOG_INFO("get no result with key({}) from meta storage, revision is {}", prefixKey,
getResponse->header.revision);
return SyncResult{ Status::OK() };
}
std::vector<WatchEvent> events;
for (auto &kv : getResponse->kvs) {
WatchEvent event{ .eventType = EVENT_TYPE_PUT, .kv = kv, .prevKv = {} };
(void)events.emplace_back(event);
}
litebus::Async(GetAID(), &ScalerActor::UpdateSystemUpgradeSwitchEvent, events);
return SyncResult{ Status::OK() };
}
litebus::Future<SyncResult> ScalerActor::InstanceInfoSyncer(const std::shared_ptr<GetResponse> &getResponse)
{
if (getResponse->status.IsError()) {
YRLOG_INFO("failed to get key({}) from meta storage", INSTANCE_PATH_PREFIX);
return SyncResult{ getResponse->status };
}
if (getResponse->kvs.empty()) {
YRLOG_INFO("get no result with key({}) from meta storage, revision is {}", INSTANCE_PATH_PREFIX,
getResponse->header.revision);
return SyncResult{ Status::OK() };
}
std::set<std::string> etcdKvSet;
std::vector<WatchEvent> events;
for (auto &kv : getResponse->kvs) {
auto eventKey = TrimKeyPrefix(kv.key(), member_->metaStorageAccessor->GetMetaClient()->GetTablePrefix());
auto instanceID = GetInstanceID(eventKey);
WatchEvent event{ .eventType = EVENT_TYPE_PUT, .kv = kv, .prevKv = {} };
(void)events.emplace_back(event);
etcdKvSet.emplace(instanceID);
}
UpdateInstanceEvent(events);
std::unordered_map<std::string, std::string> needToRemove;
for (const auto &elem : member_->instanceId2PodName) {
if (auto it = etcdKvSet.find(elem.first); it == etcdKvSet.end()) {
needToRemove[elem.first] = elem.second;
}
}
for (const auto &elem : needToRemove) {
member_->instanceId2PodName.erase(elem.first);
member_->podName2InstanceId.erase(elem.second);
auto iter = member_->podNameMap.find(elem.second);
if (iter == member_->podNameMap.end()) {
YRLOG_WARN("failed to find pod({}) for instance({})", elem.second, elem.first);
continue;
}
ASSERT_IF_NULL(business_);
business_->DeletePodNotBindInstance(iter->second);
business_->UpdatePodLabelsWithoutInstance(iter->second);
}
return SyncResult{ Status::OK() };
}
litebus::Future<SyncResult> ScalerActor::PodPoolSyncer(const std::shared_ptr<GetResponse> &getResponse)
{
if (getResponse->status.IsError()) {
YRLOG_INFO("failed to get key({}) from meta storage", POD_POOL_PREFIX);
return SyncResult{ getResponse->status };
}
if (getResponse->kvs.empty()) {
YRLOG_INFO("get no result with key({}) from meta storage, revision is {}", POD_POOL_PREFIX,
getResponse->header.revision);
return SyncResult{ Status::OK() };
}
std::vector<WatchEvent> events;
std::set<std::string> etcdKvSet;
for (auto &kv : getResponse->kvs) {
auto eventKey = TrimKeyPrefix(kv.key(), member_->metaStorageAccessor->GetMetaClient()->GetTablePrefix());
auto poolID = GetPodPoolID(eventKey);
auto poolInfo = member_->poolManager->GetPodPool(poolID);
etcdKvSet.emplace(poolID);
if (poolInfo == nullptr) {
WatchEvent event{ .eventType = EVENT_TYPE_PUT, .kv = kv, .prevKv = {} };
(void)events.emplace_back(event);
continue;
}
auto cacheJson = member_->poolManager->TransToJsonFromPodPoolInfo(poolInfo);
if (cacheJson != kv.value() || poolInfo->status == static_cast<int32_t>(PoolState::DELETED)) {
WatchEvent event{ .eventType = EVENT_TYPE_PUT, .kv = kv, .prevKv = {} };
(void)events.emplace_back(event);
}
}
UpdatePodPoolEvent(events);
auto poolMap = member_->poolManager->GetPoolMap();
for (auto it = poolMap.cbegin(); it != poolMap.cend();) {
if (etcdKvSet.count(it->first) == 0) {
it = poolMap.erase(it);
} else {
++it;
}
}
return SyncResult{ Status::OK() };
}
void ScalerActor::StartCheckPodStatus()
{
ASSERT_IF_NULL(business_);
business_->CheckPodStatus();
litebus::AsyncAfter(POD_STATUS_CHECK_INTERVAL_MS, GetAID(), &ScalerActor::StartCheckPodStatus);
}
void ScalerActor::StartCheckSystemFunctionNeedScale()
{
if (member_->frontendManager != nullptr) {
member_->frontendManager->CheckSystemFunctionNeedScale();
}
litebus::AsyncAfter(CHECK_SYSTEM_FUNCTION_INTERVAL, GetAID(), &ScalerActor::StartCheckSystemFunctionNeedScale);
}
Status ScalerActor::SyncTemplateDeployment()
{
if (member_->templateDeployment != nullptr) {
YRLOG_INFO("function agent template has exist");
return Status::OK();
}
litebus::Promise<std::shared_ptr<V1Deployment>> promise;
if (member_->agentTemplatePath.empty() || !litebus::os::ExistPath(member_->agentTemplatePath)) {
YRLOG_INFO("function agent deployment template json file({}) not exist", member_->agentTemplatePath);
return Status(StatusCode::FILE_NOT_FOUND);
}
std::string jsonStr = Read(member_->agentTemplatePath);
nlohmann::json confJson;
try {
confJson = nlohmann::json::parse(jsonStr);
} catch (nlohmann::detail::parse_error &e) {
YRLOG_ERROR("parse json failed, {}, error: {}", jsonStr, e.what());
return Status(StatusCode::JSON_PARSE_ERROR);
}
member_->templateDeployment = std::make_shared<V1Deployment>();
if (!member_->templateDeployment->FromJson(confJson)) {
YRLOG_WARN("failed to decode from template deployment.");
}
member_->poolManager->PutDeployment(member_->templateDeployment);
GetDsWorkerPort(member_->templateDeployment);
return Status::OK();
}
}