Copyright(C) 2026. Huawei Technologies Co.,Ltd. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workload
import (
"context"
"encoding/json"
"fmt"
"strconv"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
"ascend-common/common-utils/hwlog"
"infer-operator/pkg/api/v1"
"infer-operator/pkg/common"
)
type DeploymentHandler struct {
client client.Client
}
func NewDeploymentHandler(client client.Client) *DeploymentHandler {
return &DeploymentHandler{
client: client,
}
}
type DeploymentWorkLoad struct {
*appsv1.Deployment
}
func (d *DeploymentWorkLoad) SetWorkLoadObjMeta(objectMeta metav1.ObjectMeta) {
if d == nil {
return
}
d.ObjectMeta = objectMeta
}
func (d *DeploymentWorkLoad) GetWorkLoadObjMeta() metav1.ObjectMeta {
if d == nil {
return metav1.ObjectMeta{}
}
return d.ObjectMeta
}
func (d *DeploymentWorkLoad) IsWorkLoadReady() bool {
if d == nil {
return false
}
desiredReplicas := common.DefaultReplicas
if d.Spec.Replicas != nil {
desiredReplicas = *d.Spec.Replicas
}
if d.Generation > 0 && d.Status.ObservedGeneration < d.Generation {
hwlog.RunLog.Warnf("Deployment %s/%s is not latest", d.Namespace, d.Name)
return false
}
if d.Status.ReadyReplicas != desiredReplicas ||
d.Status.AvailableReplicas != desiredReplicas ||
d.Status.UpdatedReplicas != desiredReplicas {
return false
}
available := getDeploymentCondition(d.Status.Conditions, appsv1.DeploymentAvailable)
progressing := getDeploymentCondition(d.Status.Conditions, appsv1.DeploymentProgressing)
if available == nil || available.Status != corev1.ConditionTrue {
hwlog.RunLog.Warnf("Deployment %s/%s is not available, Condition<%s> is not true",
d.Namespace, d.Name, appsv1.DeploymentAvailable)
return false
}
if progressing == nil || progressing.Status != corev1.ConditionTrue {
hwlog.RunLog.Warnf("Deployment %s/%s is not progressing, Condition<%s> is not true",
d.Namespace, d.Name, appsv1.DeploymentAvailable)
return false
}
return true
}
func (d *DeploymentWorkLoad) GetWorkLoadReplicas() int32 {
if d == nil {
return common.DefaultReplicas
}
replicas := d.Spec.Replicas
if replicas == nil {
return common.DefaultReplicas
}
return *replicas
}
func (d *DeploymentHandler) CheckOrCreateWorkLoad(ctx context.Context,
instanceSet *v1.InstanceSet,
indexer common.InstanceIndexer) error {
if instanceSet.Spec.Services == nil || len(instanceSet.Spec.Services) == 0 {
if err := d.checkOrCreateService(ctx, instanceSet, indexer); err != nil {
return err
}
}
selectLabels := make(map[string]string)
selectLabels = common.AddLabelsFromIndexer(selectLabels, indexer)
deployList, err := d.ListWorkLoads(ctx, selectLabels, indexer.Namespace)
if err != nil {
return err
}
if len(deployList.Items) == 0 {
hwlog.RunLog.Infof("deployment of <%v> not exist, try to create", indexer)
err := d.createDeployment(ctx, instanceSet, indexer)
if err != nil {
return err
}
}
if len(deployList.Items) > 1 {
hwlog.RunLog.Warnf("More than one Deployment exists in InstanceSet<%s>", instanceSet.Name)
}
return nil
}
func (d *DeploymentHandler) checkOrCreateService(
ctx context.Context,
instanceSet *v1.InstanceSet,
indexer common.InstanceIndexer) error {
service := &corev1.Service{}
serviceNamespacedName := types.NamespacedName{
Name: common.GetServiceNameFromIndexer(indexer),
Namespace: instanceSet.Namespace,
}
err := d.client.Get(ctx, serviceNamespacedName, service)
if err != nil && !errors.IsNotFound(err) {
hwlog.RunLog.Errorf("Failed to get service %s/%s: %v",
instanceSet.Namespace, instanceSet.Name, err)
return common.NewRequeueError(err.Error())
}
if errors.IsNotFound(err) {
hwlog.RunLog.Infof("service of <%v> not exist, try to create", indexer)
if err := d.createService(ctx, instanceSet, indexer); err != nil {
return common.NewRequeueError(err.Error())
}
}
return nil
}
func (d *DeploymentHandler) createDeployment(
ctx context.Context,
instanceSet *v1.InstanceSet,
indexer common.InstanceIndexer) error {
deploymentSpec, err := d.parseDeploymentWithScheme(instanceSet.Spec.InstanceSpec)
if err != nil {
return err
}
deployLabels := common.DeepCopyLabelsMap(instanceSet.Labels)
for k, v := range instanceSet.Spec.WorkloadObjectMeta.Labels {
deployLabels[k] = v
}
deployLabels = common.AddLabelsFromIndexer(deployLabels, indexer)
faultScheduling, ok := deploymentSpec.Template.Labels[common.FaultSchedulingLabelKey]
if ok {
deployLabels[common.FaultSchedulingLabelKey] = faultScheduling
}
deployAnnotations := common.DeepCopyLabelsMap(instanceSet.Annotations)
for k, v := range instanceSet.Spec.WorkloadObjectMeta.Annotations {
deployAnnotations[k] = v
}
deploymentSpec.Template.Labels = common.AddLabelsFromIndexer(deploymentSpec.Template.Labels, indexer)
if deploymentSpec.Template.Annotations == nil {
deploymentSpec.Template.Annotations = map[string]string{}
}
useGangScheduling := instanceSet.Labels[common.GangScheduleLabelKey] == common.TrueBool
if useGangScheduling {
deploymentSpec.Template.Annotations[common.GroupNameAnnotationKey] = common.GetPGNameFromIndexer(indexer)
}
common.AddEnvToPodTemplate(&deploymentSpec.Template, indexer)
newDeployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: common.GetWorkLoadNameFromIndexer(indexer),
Namespace: instanceSet.Namespace,
Annotations: deployAnnotations,
Labels: deployLabels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(instanceSet, instanceSet.GroupVersionKind()),
},
},
Spec: *deploymentSpec,
}
err = d.client.Create(ctx, newDeployment)
if err != nil {
hwlog.RunLog.Errorf("Failed to create Deployment<%s>: %v", newDeployment.Name, err)
return common.NewRequeueError(err.Error())
}
return nil
}
func (d *DeploymentHandler) createService(
ctx context.Context,
instanceSet *v1.InstanceSet,
indexer common.InstanceIndexer) error {
labels := make(map[string]string)
labels = common.AddLabelsFromIndexer(labels, indexer)
newService := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: common.GetServiceNameFromIndexer(indexer),
Namespace: instanceSet.Namespace,
Annotations: instanceSet.Annotations,
Labels: labels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(instanceSet, instanceSet.GroupVersionKind()),
},
},
Spec: corev1.ServiceSpec{
Selector: labels,
Ports: []corev1.ServicePort{
{
Name: common.DefaultPortName,
Port: common.DefaultPort,
TargetPort: intstr.FromInt(common.DefaultPort),
},
},
},
}
err := d.client.Create(ctx, newService)
if err != nil {
hwlog.RunLog.Errorf("Failed to create Service<%s>: %v", newService.Name, err)
return common.NewRequeueError(err.Error())
}
return nil
}
func (d *DeploymentHandler) DeleteExtraWorkLoad(
ctx context.Context,
indexer common.InstanceIndexer, indexLimit int) error {
selectLabels := make(map[string]string)
selectLabels = common.AddLabelsFromIndexer(selectLabels, indexer)
delete(selectLabels, common.InstanceIndexLabelKey)
hwlog.RunLog.Infof("try to delete extra instances, labels: %v", selectLabels)
deployList, err := d.ListWorkLoads(ctx, selectLabels, indexer.Namespace)
if err != nil {
return err
}
for _, deploy := range deployList.Items {
instanceIndexStr, ok := deploy.Labels[common.InstanceIndexLabelKey]
if !ok {
continue
}
instanceIndex, err := strconv.Atoi(instanceIndexStr)
if err != nil {
hwlog.RunLog.Warnf("Deployment<%s> Failed to convert instance index to int: %v",
deploy.Name, instanceIndexStr)
continue
}
if instanceIndex < indexLimit && instanceIndex >= 0 {
continue
}
if err = d.client.Delete(ctx, &deploy); err != nil {
hwlog.RunLog.Errorf("Failed to delete Deployment<%s>: %v", deploy.Name, err)
return err
}
hwlog.RunLog.Infof("Delete Extra Deployment<%s>", deploy.Name)
}
return d.deleteExtraService(ctx, selectLabels, indexLimit)
}
func (d *DeploymentHandler) GetWorkLoadReadyReplicas(
ctx context.Context,
indexer common.InstanceIndexer) (int, error) {
readyReplicas := 0
selectLabels := make(map[string]string)
selectLabels = common.AddLabelsFromIndexer(selectLabels, indexer)
delete(selectLabels, common.InstanceIndexLabelKey)
deployList, err := d.ListWorkLoads(ctx, selectLabels, indexer.Namespace)
if err != nil {
return readyReplicas, err
}
for _, deployment := range deployList.Items {
if isDeploymentReady(deployment) {
readyReplicas++
}
}
return readyReplicas, nil
}
func (d *DeploymentHandler) deleteExtraService(
ctx context.Context,
selectLabels map[string]string,
indexLimit int) error {
serviceList := &corev1.ServiceList{}
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: selectLabels,
})
if err != nil {
hwlog.RunLog.Errorf("Failed to create ServiceList<%s>: %v", selectLabels, err)
return common.NewRequeueError(err.Error())
}
if err = d.client.List(ctx, serviceList, client.MatchingLabelsSelector{Selector: selector}); err != nil {
hwlog.RunLog.Errorf("Failed to list ServiceList<%s>: %v", selectLabels, err)
return common.NewRequeueError(err.Error())
}
for _, service := range serviceList.Items {
instanceIndexStr, ok := service.Labels[common.InstanceIndexLabelKey]
if !ok {
continue
}
instanceIndex, err := strconv.Atoi(instanceIndexStr)
if err != nil {
hwlog.RunLog.Warnf("service<%s> Failed to convert instance index to int: %v",
service.Name, instanceIndexStr)
continue
}
if instanceIndex < indexLimit && instanceIndex >= 0 {
continue
}
err = d.client.Delete(ctx, &service)
if err != nil {
hwlog.RunLog.Errorf("Failed to delete Extra Service<%s>: %v", service.Name, err)
return common.NewRequeueError(err.Error())
}
}
return nil
}
func (d *DeploymentHandler) ListWorkLoads(
ctx context.Context,
selectLabels map[string]string,
namespace string) (*appsv1.DeploymentList, error) {
deployList := &appsv1.DeploymentList{}
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: selectLabels,
})
if err != nil {
hwlog.RunLog.Errorf("Failed to create selector: %v", err)
return nil, common.NewRequeueError(err.Error())
}
if err = d.client.List(ctx, deployList,
client.MatchingLabelsSelector{Selector: selector}, client.InNamespace(namespace)); err != nil {
hwlog.RunLog.Errorf("Failed to list Deployments: %v", err)
return nil, common.NewRequeueError(err.Error())
}
return deployList, nil
}
func (d *DeploymentHandler) Validate(spec runtime.RawExtension) error {
_, err := d.parseDeploymentWithScheme(spec)
if err != nil {
return err
}
return nil
}
func (d *DeploymentHandler) GetReplicas(spec runtime.RawExtension) (int32, error) {
deploymentSpec, err := d.parseDeploymentWithScheme(spec)
if err != nil {
return common.DefaultReplicas, err
}
replicas := deploymentSpec.Replicas
if replicas == nil {
return common.DefaultReplicas, nil
}
return *replicas, nil
}
func isDeploymentReady(deployment appsv1.Deployment) bool {
desiredReplicas := int32(1)
if deployment.Spec.Replicas != nil {
desiredReplicas = *deployment.Spec.Replicas
}
if deployment.Generation > 0 && deployment.Status.ObservedGeneration < deployment.Generation {
hwlog.RunLog.Warnf("Deployment %s/%s is not latest", deployment.Namespace, deployment.Name)
return false
}
if deployment.Status.ReadyReplicas != desiredReplicas ||
deployment.Status.AvailableReplicas != desiredReplicas ||
deployment.Status.UpdatedReplicas != desiredReplicas {
return false
}
available := getDeploymentCondition(deployment.Status.Conditions, appsv1.DeploymentAvailable)
progressing := getDeploymentCondition(deployment.Status.Conditions, appsv1.DeploymentProgressing)
if available == nil || available.Status != corev1.ConditionTrue {
hwlog.RunLog.Warnf("Deployment %s/%s is not available, Condition<%s> is not true",
deployment.Namespace, deployment.Name, appsv1.DeploymentAvailable)
return false
}
if progressing == nil || progressing.Status != corev1.ConditionTrue {
hwlog.RunLog.Warnf("Deployment %s/%s is not progressing, Condition<%s> is not true",
deployment.Namespace, deployment.Name, appsv1.DeploymentAvailable)
return false
}
return true
}
func getDeploymentCondition(
conditions []appsv1.DeploymentCondition,
condType appsv1.DeploymentConditionType) *appsv1.DeploymentCondition {
for i := range conditions {
if conditions[i].Type == condType {
return &conditions[i]
}
}
return nil
}
func (d *DeploymentHandler) parseDeploymentWithScheme(raw runtime.RawExtension) (*appsv1.DeploymentSpec, error) {
if len(raw.Raw) == 0 {
return nil, fmt.Errorf("raw extension is empty")
}
var spec appsv1.DeploymentSpec
if err := json.Unmarshal(raw.Raw, &spec); err != nil {
return nil, fmt.Errorf("failed to unmarshal RawExtension to DeploymentSpec: %w", err)
}
return &spec, nil
}
func (d *DeploymentHandler) ListWorkLoad(
ctx context.Context,
selectLabels map[string]string,
namespace string,
filters ...WorkLoadFilter) ([]WorkLoadInterface, error) {
deployList := &appsv1.DeploymentList{}
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: selectLabels,
})
if err != nil {
hwlog.RunLog.Errorf("Failed to create selector: %v", err)
return nil, common.NewRequeueError(err.Error())
}
if err = d.client.List(ctx, deployList,
client.MatchingLabelsSelector{Selector: selector}, client.InNamespace(namespace)); err != nil {
hwlog.RunLog.Errorf("Failed to list Deployments: %v", err)
return nil, common.NewRequeueError(err.Error())
}
deploymentWorkLoadList := make([]WorkLoadInterface, 0, len(deployList.Items))
for _, deploy := range deployList.Items {
deployCopy := deploy
ok := true
deploymentWorkLoad := &DeploymentWorkLoad{Deployment: &deployCopy}
for _, filter := range filters {
ok = ok && filter(deploymentWorkLoad)
if !ok {
break
}
}
if ok {
deploymentWorkLoadList = append(deploymentWorkLoadList, deploymentWorkLoad)
}
}
return deploymentWorkLoadList, nil
}
func (d *DeploymentHandler) DeleteWorkLoad(
ctx context.Context,
selectLabels map[string]string,
namespace string,
filters ...WorkLoadFilter) error {
deployList, err := d.ListWorkLoads(ctx, selectLabels, namespace)
if err != nil {
return fmt.Errorf("failed to list deployment work loads: %w", err)
}
var workloadList []*DeploymentWorkLoad
for _, deploy := range deployList.Items {
ok := true
deployCopy := deploy
workload := &DeploymentWorkLoad{Deployment: &deployCopy}
for _, filter := range filters {
ok = ok && filter(workload)
if !ok {
break
}
}
if ok {
workloadList = append(workloadList, workload)
}
}
for _, workload := range workloadList {
if err := d.client.Delete(ctx, workload.Deployment); err != nil {
return fmt.Errorf("failed to delete deployment work load %s/%s: %w", workload.Namespace, workload.Name, err)
}
}
return nil
}
func (d *DeploymentHandler) UpdateWorkLoad(
ctx context.Context,
selectLabels map[string]string,
namespace string,
updater WorkloadUpdater,
filters ...WorkLoadFilter) error {
deployList, err := d.ListWorkLoads(ctx, selectLabels, namespace)
if err != nil {
return fmt.Errorf("failed to list deployment work loads: %w", err)
}
var workloadList []*DeploymentWorkLoad
for _, deploy := range deployList.Items {
ok := true
deployCopy := deploy
workload := &DeploymentWorkLoad{Deployment: &deployCopy}
for _, filter := range filters {
ok = ok && filter(workload)
if !ok {
break
}
}
if ok {
workloadList = append(workloadList, workload)
}
}
for _, workload := range workloadList {
updater(workload)
if err := d.client.Update(ctx, workload.Deployment); err != nil {
return fmt.Errorf("failed to update deployment work load %s/%s: %w", workload.Namespace, workload.Name, err)
}
}
return nil
}