Copyright(C) 2023-2025. Huawei Technologies Co.,Ltd. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
Package controllers is using for reconcile AscendJob.
*/
package v1
import (
"context"
"errors"
"fmt"
"strconv"
"time"
commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/common"
"github.com/kubeflow/common/pkg/controller.v1/control"
"github.com/kubeflow/common/pkg/util"
"golang.org/x/time/rate"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
k8serr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/apis/pkg/client/informers/externalversions"
"ascend-common/api"
"ascend-common/common-utils/hwlog"
mindxdlv1 "ascend-operator/pkg/api/v1"
"ascend-operator/pkg/controllers/scaling"
"ascend-operator/pkg/ranktable"
"ascend-operator/pkg/ranktable/generator"
"ascend-operator/pkg/ranktable/utils"
)
func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *ASJobReconciler {
r := &ASJobReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
apiReader: mgr.GetAPIReader(),
recorder: mgr.GetEventRecorderFor(api.ControllerName),
versions: make(map[types.UID]int32),
backoffLimits: make(map[types.UID]int32),
rtGenerators: make(map[types.UID]generator.RankTableGenerator),
}
cfg := mgr.GetConfig()
kubeClientSet := kubernetes.NewForConfigOrDie(cfg)
volcanoClientSet := versioned.NewForConfigOrDie(cfg)
volInformerFactory := externalversions.NewSharedInformerFactory(volcanoClientSet, 0)
pgLister := volInformerFactory.Scheduling().V1beta1().PodGroups().Lister()
volInformerFactory.Start(wait.NeverStop)
sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0)
priorityClassInformer := sharedInformers.Scheduling().V1beta1().PriorityClasses()
r.scaler = scaling.New(kubeClientSet, pgLister)
r.JobController = common.JobController{
Controller: r,
Config: common.JobControllerConfiguration{EnableGangScheduling: enableGangScheduling},
WorkQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
Recorder: r.recorder,
PodLister: sharedInformers.Core().V1().Pods().Lister(),
ServiceLister: sharedInformers.Core().V1().Services().Lister(),
KubeClientSet: kubeClientSet,
VolcanoClientSet: volcanoClientSet,
PriorityClassLister: priorityClassInformer.Lister(),
PriorityClassInformerSynced: priorityClassInformer.Informer().HasSynced,
PodControl: control.RealPodControl{KubeClient: kubeClientSet, Recorder: r.recorder},
ServiceControl: control.RealServiceControl{KubeClient: kubeClientSet, Recorder: r.recorder},
}
return r
}
type ASJobReconciler struct {
common.JobController
client.Client
Scheme *runtime.Scheme
recorder record.EventRecorder
apiReader client.Reader
scaler *scaling.Controller
versions map[types.UID]int32
backoffLimits map[types.UID]int32
rtGenerators map[types.UID]generator.RankTableGenerator
batchMgr batchCreateManager
}
func (r *ASJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
if r == nil {
return ctrl.Result{}, errors.New("nil pointer")
}
if r.isJobDecorator(ctx, req) {
return ctrl.Result{}, nil
}
ascendjob := &mindxdlv1.AscendJob{}
if err := r.Get(ctx, req.NamespacedName, ascendjob); err != nil {
if k8serr.IsNotFound(err) {
hwlog.RunLog.Debugf("unable to fetch Job<%s>, err: %s", req.NamespacedName, err)
} else {
hwlog.RunLog.Errorf("unable to fetch Job<%s>, err: %s", req.NamespacedName, err)
}
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if err := r.validateJob(ascendjob); err != nil {
hwlog.RunLog.Errorf("Job<%s> failed validation, err: %v", req.NamespacedName, err)
if err := util.UpdateJobConditions(&ascendjob.Status, commonv1.JobFailed, jobValidFailedReason,
fmt.Sprintf("%s: %s", err.reason, err.message)); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, r.UpdateJobStatusInApiServer(ascendjob, &ascendjob.Status)
}
if ascendjob.GetDeletionTimestamp() != nil {
hwlog.RunLog.Infof("reconcile cancelled,job<%s> has been deleted", req.NamespacedName)
delete(r.versions, ascendjob.UID)
delete(r.backoffLimits, ascendjob.UID)
return ctrl.Result{}, nil
}
r.Scheme.Default(ascendjob)
err := r.ReconcileJobs(ascendjob, ascendjob.Spec.ReplicaSpecs, ascendjob.Status, &ascendjob.Spec.RunPolicy)
if err != nil {
if k8serr.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
}
hwlog.RunLog.Warnf("Reconcile Job<%s> failed err: %s", req.NamespacedName, err)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
func (r *ASJobReconciler) isJobDecorator(ctx context.Context, req ctrl.Request) bool {
resourceTypes := []client.Object{
&appv1.Deployment{},
&appv1.StatefulSet{},
&v1alpha1.Job{},
}
for _, obj := range resourceTypes {
if err := r.Get(ctx, req.NamespacedName, obj); err == nil {
r.ranktablePipeline(obj)
return true
}
}
return false
}
func (r *ASJobReconciler) ranktablePipeline(obj metav1.Object) {
ascendJob, err := ranktable.GetAscendJobFromObject(obj)
if err != nil {
hwlog.RunLog.Errorf("extract config failed: %v", err)
return
}
if getJobRequiredNpu(ascendJob) == 0 {
hwlog.RunLog.Debugf("job <%s> does not require NPU, skip ranktable generation", ascendJob.Name)
return
}
ji, err := r.newJobInfo(ascendJob, ascendJob.Spec.ReplicaSpecs, &ascendJob.Status, &ascendJob.Spec.RunPolicy)
if err != nil {
hwlog.RunLog.Errorf("failed to generate ranktable for job<%s>, err: %v", ascendJob.Name, err)
return
}
r.genRankTable(ji)
}
func (r *ASJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
if r == nil {
return errors.New("nil pointer")
}
c, err := controller.New(r.ControllerName(), mgr, controller.Options{
Reconciler: r,
RateLimiter: workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(workQueueBaseDelay, workQueueMaxDelay),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(workQueueQps), workQueueBurst)},
),
})
if err != nil {
return err
}
return r.watchRelatedResource(c, mgr)
}
type resourceOption struct {
kind *source.Kind
predicateFunc predicate.Funcs
}
func (r *ASJobReconciler) watchRelatedResource(c controller.Controller, mgr ctrl.Manager) error {
if err := r.watchAscendJobRelatedResource(c, mgr); err != nil {
return err
}
if err := r.watchVolcanoJobRelatedResource(c, mgr); err != nil {
return err
}
if err := r.watchDeploymentRelatedResource(c, mgr); err != nil {
return err
}
return r.watchStatefulSetRelatedResource(c, mgr)
}
func (r *ASJobReconciler) watchAscendJobRelatedResource(c controller.Controller, mgr ctrl.Manager) error {
if err := c.Watch(&source.Kind{Type: &mindxdlv1.AscendJob{}}, &handler.EnqueueRequestForObject{},
predicate.Funcs{CreateFunc: r.onOwnerCreateFunc(), DeleteFunc: r.onOwnerDeleteFunc()},
); err != nil {
return err
}
resourceOptions := []*resourceOption{
{kind: &source.Kind{Type: &corev1.Pod{}},
predicateFunc: predicate.Funcs{DeleteFunc: r.onPodDeleteFunc(), UpdateFunc: r.onPodUpdateFunc()}},
{kind: &source.Kind{Type: &corev1.Service{}}},
}
if r.Config.EnableGangScheduling {
_, mapErr := mgr.GetRESTMapper().RESTMapping(schema.GroupKind{
Group: v1beta1.SchemeGroupVersion.Group,
Kind: "PodGroup"}, v1beta1.SchemeGroupVersion.Version)
if mapErr != nil {
hwlog.RunLog.Errorf("enableGangScheduling is true, but PodGroup is not in cluster")
return mapErr
}
resourceOptions = append(resourceOptions, &resourceOption{
kind: &source.Kind{Type: &v1beta1.PodGroup{}}})
}
for _, src := range resourceOptions {
if err := c.Watch(src.kind, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &mindxdlv1.AscendJob{},
}, src.predicateFunc); err != nil {
return err
}
}
return nil
}
func (r *ASJobReconciler) watchVolcanoJobRelatedResource(c controller.Controller, mgr ctrl.Manager) error {
if err := c.Watch(&source.Kind{Type: &v1alpha1.Job{}}, &handler.EnqueueRequestForObject{},
predicate.Funcs{CreateFunc: r.onOwnerCreateFunc(), DeleteFunc: r.onOwnerDeleteFunc()},
); err != nil {
return err
}
return c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &v1alpha1.Job{},
})
}
func (r *ASJobReconciler) watchDeploymentRelatedResource(c controller.Controller, mgr ctrl.Manager) error {
if err := c.Watch(&source.Kind{Type: &appv1.Deployment{}}, &handler.EnqueueRequestForObject{},
predicate.Funcs{CreateFunc: r.onOwnerCreateFunc(), DeleteFunc: r.onOwnerDeleteFunc()},
); err != nil {
return err
}
return c.Watch(&source.Kind{Type: &corev1.Pod{}},
handler.EnqueueRequestsFromMapFunc(func(object client.Object) []reconcile.Request {
deployPod := false
for _, owner := range object.GetOwnerReferences() {
if owner.Controller != nil && *owner.Controller && owner.Kind == "ReplicaSet" {
deployPod = true
break
}
}
if !deployPod {
return nil
}
return []reconcile.Request{
{NamespacedName: types.NamespacedName{
Name: object.GetLabels()[deployLabelKey],
Namespace: object.GetNamespace(),
}},
}
}),
)
}
func (r *ASJobReconciler) watchStatefulSetRelatedResource(c controller.Controller, mgr ctrl.Manager) error {
if err := c.Watch(&source.Kind{Type: &appv1.StatefulSet{}}, &handler.EnqueueRequestForObject{},
predicate.Funcs{CreateFunc: r.onOwnerCreateFunc(), DeleteFunc: r.onOwnerDeleteFunc()},
); err != nil {
return err
}
return c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &appv1.StatefulSet{},
})
}
func (r *ASJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {
return func(e event.CreateEvent) bool {
plugin := ranktable.FindPluginForObject(e.Object)
if plugin == nil {
return r.ascendJobCreateFunc(e)
}
if !plugin.ShouldGenerateRankTable(e.Object) {
return false
}
ascendJob, err := plugin.ExtractObjToAscendJob(e.Object)
if err != nil {
hwlog.RunLog.Errorf("extract config failed: %v", err)
return false
}
r.rtGenerators[e.Object.GetUID()] = ranktable.NewGenerator(ascendJob)
hwlog.RunLog.Infof("create rtGenerator for %s %s", ascendJob.TypeMeta.Kind, ascendJob.Name)
return true
}
}
func (r *ASJobReconciler) ascendJobCreateFunc(e event.CreateEvent) bool {
ascendJob, ok := e.Object.(*mindxdlv1.AscendJob)
if !ok {
return true
}
msg := fmt.Sprintf("Job %s is create.", e.Object.GetName())
hwlog.RunLog.Info(msg)
err := util.UpdateJobConditions(&ascendJob.Status, commonv1.JobCreated, "JobCreated", msg)
if err != nil {
log.Log.Error(err, "append job condition error")
return false
}
r.versions[ascendJob.UID] = defaultPodVersion
r.backoffLimits[ascendJob.UID] = unsetBackoffLimits
if ascendJob.Spec.RunPolicy.BackoffLimit != nil {
r.backoffLimits[ascendJob.UID] = *ascendJob.Spec.RunPolicy.BackoffLimit
} else if err = r.setFaultRetryTimesToBackoffLimits(ascendJob); err != nil {
hwlog.RunLog.Errorf("failed to get fault-retry-times, error: %v", err)
return false
}
if frame, err := mindxdlv1.GetJobFramework(ascendJob); err == nil {
r.rtGenerators[ascendJob.UID] = ranktable.NewGenerator(ascendJob)
hwlog.RunLog.Infof("create rtGenerator for frame %s Job %s", frame, ascendJob.Name)
}
return true
}
func (r *ASJobReconciler) setFaultRetryTimesToBackoffLimits(ascendJob *mindxdlv1.AscendJob) error {
if len(ascendJob.ObjectMeta.Labels) == 0 {
return nil
}
if value, ok := ascendJob.ObjectMeta.Labels[labelFaultRetryTimes]; ok && value != "" {
faultRetryTimes, err := strconv.Atoi(value)
if err != nil {
hwlog.RunLog.Errorf("failed to convert string to int, error: %v", err)
return err
}
r.backoffLimits[ascendJob.UID] = int32(faultRetryTimes)
}
return nil
}
func (r *ASJobReconciler) onOwnerDeleteFunc() func(deleteEvent event.DeleteEvent) bool {
return func(e event.DeleteEvent) bool {
if rtg, ok := r.rtGenerators[e.Object.GetUID()]; ok {
if err := rtg.DeleteFile(); err != nil {
hwlog.RunLog.Errorf("failed to delete ranktable, err: %v", err)
}
delete(r.rtGenerators, e.Object.GetUID())
}
ascendJob, ok := e.Object.(*mindxdlv1.AscendJob)
if !ok {
return false
}
msg := fmt.Sprintf("Job %s is deleted.", e.Object.GetName())
hwlog.RunLog.Info(msg)
delete(r.versions, ascendJob.UID)
delete(r.backoffLimits, ascendJob.UID)
return true
}
}
func (r *ASJobReconciler) onPodDeleteFunc() func(event.DeleteEvent) bool {
return func(e event.DeleteEvent) bool {
controllerRef := metav1.GetControllerOf(e.Object)
replicaType, ok := e.Object.GetLabels()[commonv1.ReplicaTypeLabel]
if !ok || len(replicaType) == 0 {
return false
}
version, ok := e.Object.GetLabels()[podVersionLabel]
if !ok || len(version) == 0 {
return false
}
versionNumber, err := strconv.Atoi(version)
if err != nil {
hwlog.RunLog.Errorf("failed to convert string to int, err: %v", err)
return false
}
hwlog.RunLog.Infof("deleted pod <%s> version is: %v", e.Object.GetName(), version)
if controllerRef == nil {
return true
}
currentVersion, ok := r.versions[controllerRef.UID]
if ok && int32(versionNumber) == currentVersion {
r.versions[controllerRef.UID]++
}
r.handleHotswitchPodDelete(e)
return true
}
}
func (r *ASJobReconciler) handleHotswitchPodDelete(e event.DeleteEvent) {
pod, ok := e.Object.(*corev1.Pod)
if !ok {
return
}
if pod.Annotations[api.PodTypeKey] == api.PodTypeBackup {
handleNewPodDeleted(pod, r)
} else if pod.Annotations[api.InHotSwitchFlowKey] == api.InHotSwitchFlowValue {
handleOldPodDeleted(pod, r)
}
}
func handleOldPodDeleted(pod *v1.Pod, r *ASJobReconciler) {
if r == nil {
hwlog.RunLog.Errorf("hotswitch: reconciler is nil")
return
}
hwlog.RunLog.Infof("hotswitch: old pod deleted,podName: %s", pod.Name)
newPodName := pod.Annotations[api.BackupNewPodNameKey]
ctx := context.TODO()
newPod := &v1.Pod{}
if err := r.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: newPodName}, newPod); err != nil {
hwlog.RunLog.Errorf("hotswitch: could not find newPod: %s", newPod.Name)
return
}
original := newPod.DeepCopy()
delete(newPod.Annotations, api.PodTypeKey)
delete(newPod.Annotations, api.BackupSourcePodNameKey)
patch := client.MergeFrom(original)
err := r.Patch(context.TODO(), newPod, patch)
if err != nil {
hwlog.RunLog.Errorf("hotswitch: delete annotations[podType、backupSourcePodName] failed, pod: %s/%s,err:%v",
newPod.Namespace, newPod.Name, err)
return
}
hwlog.RunLog.Infof("hotswitch: delete annotations[podType、backupSourcePodName] success, pod: %s/%s",
newPod.Namespace, newPod.Name)
jobId := getJobKeyByPod(pod)
rtg, ok := r.rtGenerators[jobId]
if !ok {
hwlog.RunLog.Warnf("rank table generator not found for job %s", jobId)
return
}
rtg.SetStatus(utils.InitialRTStatus)
rtg.SetFileStatus(utils.InitialRTStatus)
rtg.SetConfigmapStatus(utils.InitialRTStatus)
}
func handleNewPodDeleted(pod *v1.Pod, r *ASJobReconciler) {
if r == nil {
hwlog.RunLog.Errorf("hotswitch: reconciler is nil")
return
}
ctx := context.TODO()
hwlog.RunLog.Infof("hotswitch: new pod deleted,podName: %s", pod.Name)
oldPodName := pod.Annotations[api.BackupSourcePodNameKey]
oldPod := &v1.Pod{}
if err := r.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: oldPodName}, oldPod); err != nil {
hwlog.RunLog.Errorf("hotswitch: get old pod err: %s", oldPodName)
return
}
original := oldPod.DeepCopy()
delete(oldPod.Annotations, api.InHotSwitchFlowKey)
delete(oldPod.Annotations, api.BackupNewPodNameKey)
patch := client.MergeFrom(original)
err := r.Patch(context.TODO(), oldPod, patch)
if err != nil {
hwlog.RunLog.Errorf("hotswitch: delete annotations[inHotSwitchFlow、backupNewPodName] failed, pod: %s/%s,err:%v",
oldPod.Namespace, oldPod.Name, err)
return
}
hwlog.RunLog.Infof("hotswitch: delete annotations[inHotSwitchFlow、backupNewPodName] success, pod: %s/%s",
oldPod.Namespace, oldPod.Name)
}
func (r *ASJobReconciler) ControllerName() string {
return api.ControllerName
}
func (r *ASJobReconciler) GetAPIGroupVersionKind() schema.GroupVersionKind {
return mindxdlv1.GroupVersion.WithKind(api.AscendJobKind)
}
func (r *ASJobReconciler) GetAPIGroupVersion() schema.GroupVersion {
return mindxdlv1.GroupVersion
}
func (r *ASJobReconciler) GetGroupNameLabelValue() string {
return mindxdlv1.GroupVersion.Group
}
func (r *ASJobReconciler) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) {
ascendjob := &mindxdlv1.AscendJob{}
err := r.Get(context.Background(), types.NamespacedName{
Namespace: namespace, Name: name,
}, ascendjob)
return ascendjob, err
}
func (r *ASJobReconciler) GetJobFromAPIClient(namespace, name string) (metav1.Object, error) {
job := &mindxdlv1.AscendJob{}
err := r.apiReader.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: name}, job)
if err != nil {
if k8serr.IsNotFound(err) {
hwlog.RunLog.Warnf("Job<%s/%s> not found, err: %s", namespace, name, err)
} else {
hwlog.RunLog.Errorf("failed to get Job<%s/%s> from api-server. err: %s", namespace, name, err)
}
return nil, err
}
return job, nil
}
func (r *ASJobReconciler) DeleteJob(job interface{}) error {
ascendjob, ok := job.(*mindxdlv1.AscendJob)
if !ok {
return fmt.Errorf("%v is not a type of Job", ascendjob)
}
if err := r.Delete(context.Background(), ascendjob); err != nil {
r.recorder.Eventf(ascendjob, v1.EventTypeWarning, FailedDeleteJobReason, "Error deleting: %v", err)
hwlog.RunLog.Errorf("failed to delete job<%s-%s>, err: %s", ascendjob.Namespace, ascendjob.Name, err)
return err
}
r.recorder.Eventf(ascendjob, v1.EventTypeNormal, SuccessfulDeleteJobReason, "Deleted job: %v", ascendjob.Name)
hwlog.RunLog.Infof("job<%s-%s> has been deleted", ascendjob.Namespace, ascendjob.Name)
return nil
}
func (r *ASJobReconciler) UpdateJobStatusInApiServer(job interface{}, jobStatus *commonv1.JobStatus) error {
if jobStatus.ReplicaStatuses == nil {
jobStatus.ReplicaStatuses = map[commonv1.ReplicaType]*commonv1.ReplicaStatus{}
}
ascendjob, ok := job.(*mindxdlv1.AscendJob)
if !ok {
return fmt.Errorf("%v is not a type of Job", ascendjob)
}
startTime := time.Now()
defer func() {
hwlog.RunLog.Infof("Finished updating Job Status %q (%v)",
ascendjob.Name, time.Since(startTime))
}()
ascendjob = ascendjob.DeepCopy()
ascendjob.Status = *jobStatus.DeepCopy()
return r.Status().Update(context.Background(), ascendjob)
}
func (r *ASJobReconciler) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec,
rtype, index string) error {
return nil
}
func (r *ASJobReconciler) GetDefaultContainerName() string {
return api.DefaultContainerName
}
func (r *ASJobReconciler) GetDefaultContainerPortName() string {
return api.DefaultPortName
}
func (r *ASJobReconciler) IsMasterRole(_ map[commonv1.ReplicaType]*commonv1.ReplicaSpec,
rtype commonv1.ReplicaType, _ int) bool {
return rtype == mindxdlv1.MindSporeReplicaTypeScheduler ||
rtype == mindxdlv1.PytorchReplicaTypeMaster ||
rtype == mindxdlv1.TensorflowReplicaTypeChief
}
func (r *ASJobReconciler) writeRanktableToCm(jobName, namespace string, uid types.UID) error {
configmapName := configmapPrefix + jobName
cm := &corev1.ConfigMap{}
namespacedname := types.NamespacedName{Namespace: namespace, Name: configmapName}
err := r.Get(context.TODO(), namespacedname, cm)
if err != nil {
return err
}
rtg, ok := r.rtGenerators[uid]
if !ok {
return fmt.Errorf("ranktable generaotor not found for job %s", jobName)
}
if cm.Data == nil {
cm.Data = make(map[string]string)
}
cm.Data[configmapKey], err = rtg.ToString()
if err != nil {
return err
}
cm.Data[configmapVersion] = strconv.FormatUint(rtg.GetTimeStamp(), decimal)
if err := r.Update(context.TODO(), cm); err != nil {
return err
}
return nil
}
func (r *ASJobReconciler) onPodUpdateFunc() func(event.UpdateEvent) bool {
return func(e event.UpdateEvent) bool {
oldPod, oldPodOk := e.ObjectOld.(*corev1.Pod)
if !oldPodOk {
hwlog.RunLog.Errorf("objectOld unable to convert object to Pod:%v", e.ObjectOld)
return false
}
newPod, newPodOk := e.ObjectNew.(*corev1.Pod)
if !newPodOk {
hwlog.RunLog.Errorf("objectNew unable to convert object to Pod:%v", e.ObjectNew)
return false
}
if newPod.Annotations[api.NeedVolcanoOpeKey] == api.OpeTypeDelete {
return false
}
if newPod.Annotations[api.NeedOperatorOpeKey] == api.OpeTypeCreate &&
oldPod.Annotations[api.NeedOperatorOpeKey] != api.OpeTypeCreate {
hwlog.RunLog.Infof("detected needOperatorOpe[create],will create backup pod based on pod %v", newPod.Name)
}
if newPod.Annotations[api.NeedOperatorOpeKey] == api.OpeTypeDelete {
hwlog.RunLog.Infof("detected needOperatorOpe[delete],will delete pod %v", newPod.Name)
r.deletePod(newPod)
}
return true
}
}
func (r *ASJobReconciler) deletePod(pod *corev1.Pod) {
const timeout = 30 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
deletePolicy := metav1.DeletePropagationBackground
gracePeriod := int64(0)
deleteOptions := &client.DeleteOptions{
PropagationPolicy: &deletePolicy,
GracePeriodSeconds: &gracePeriod,
}
if err := r.Delete(ctx, pod, deleteOptions); err != nil {
if !k8serr.IsNotFound(err) {
hwlog.RunLog.Errorf("failed to force delete pod %s/%s: %v", pod.Namespace, pod.Name, err)
} else {
hwlog.RunLog.Infof("pod %s/%s not found, might be already deleted", pod.Namespace, pod.Name)
}
return
}
hwlog.RunLog.Infof("successfully force deleted pod %s/%s", pod.Namespace, pod.Name)
}
func getJobKeyByPod(info *v1.Pod) types.UID {
if info == nil {
hwlog.RunLog.Errorf("serious error, get unique key failed, pod is nil")
return ""
}
for _, owner := range info.GetOwnerReferences() {
if owner.Controller != nil && *owner.Controller {
return owner.UID
}
}
hwlog.RunLog.Error("serious error, get unique key failed, pod don't have controller")
return ""
}