Copyright(C)2025. Huawei Technologies Co.,Ltd. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
Package rescheduling is using for HuaWei Ascend pin fault rescheduling.
*/
package rescheduling
import (
"errors"
"strconv"
"k8s.io/klog"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/ascend-volcano-plugin/common/util"
"volcano.sh/volcano/pkg/scheduler/plugins/ascend-volcano-plugin/plugin"
)
func is910A5SuperPodJob(schedulerJob *plugin.SchedulerJob) bool {
if schedulerJob == nil {
return false
}
return schedulerJob.Annotation[util.SchedulePolicyAnnoKey] == util.Chip8Node8Ra64Sp
}
func (fJob *FaultJob) GraceDeleteJobFor910A5(ssn *framework.Session, npuJob *plugin.SchedulerJob,
env plugin.ScheduleEnv) error {
if fJob == nil {
return errors.New("getJobFaultRescheduleLabel fJob object does not exist")
}
if ssn == nil {
return errors.New("session does not exist")
}
if npuJob == nil {
return errors.New("schedulerJob does not exist")
}
reason := fJob.getRestartInfosFor910A5()
isSuperPod := false
vSuperPodIds := make([]string, 0)
if _, ok := npuJob.Annotation[util.SuperPodAnnoKey]; ok {
isSuperPod = true
vSuperPodIds = fJob.getVSuperPodIds()
}
fJob.judgeJobIsMasterFault(vSuperPodIds)
fJob.updateSuperPodsReschdInfo(env)
dpi := &deletePodInfo{
isSuperPod: isSuperPod,
ids: vSuperPodIds,
reason: reason,
}
fJob.graceDeletePodsFor910A5(ssn, npuJob, env, dpi)
return nil
}
func (fJob *FaultJob) getRestartInfosFor910A5() string {
var reasonList []FaultReasonList
for _, fTask := range fJob.FaultTasks {
if fTask.Reason != nil {
reasonList = append(reasonList, fTask.Reason...)
}
}
reason := GetTaskRestartReason(reasonList)
return reason
}
func (fJob *FaultJob) graceDeletePodsFor910A5(ssn *framework.Session, npuJob *plugin.SchedulerJob,
env plugin.ScheduleEnv, dpi *deletePodInfo) {
for id, fTask := range fJob.FaultTasks {
npuTask, ok := npuJob.Tasks[fTask.TaskUID]
if !ok {
klog.V(util.LogDebugLev).Infof(
"rescheduling: skip grace delete because task<%s> for job <%s/%s> has been deleted in session",
fTask.TaskName, fJob.JobNamespace, fJob.JobName)
continue
}
if fJob.skipThisTask(dpi, fTask, npuJob) {
klog.V(util.LogDebugLev).Infof(
"rescheduling: skip grace delete task<%s> for job<%s/%s>", fTask.TaskName, fJob.JobNamespace,
fJob.JobName)
continue
}
klog.V(util.LogDebugLev).Infof(
"rescheduling: start to grace delete task<%s> for job<%s/%s>", fTask.TaskName, fJob.JobNamespace,
fJob.JobName)
fJob.FaultTasks[id].IsSatisfiedRackAffinity = false
fJob.updateSuperPodMapInfo(env, fTask.TaskName, fTask.NodeName)
if delErr := npuTask.ForceDeletePodByTaskInf(ssn, dpi.reason, fTask.NodeName); delErr != nil {
klog.V(util.LogErrorLev).Infof("rescheduling: ForceDeletePodByTaskInf %s: %s.", npuTask.Name, delErr)
continue
}
klog.V(util.LogDebugLev).Infof(
"rescheduling: grace delete task<%s> for job<%s/%s> succeeded", fTask.TaskName, fJob.JobNamespace,
fJob.JobName)
fJob.FaultTasks[id].IsBeingGracefulDeleted = true
}
}
func (fJob *FaultJob) ForceDeleteJobFor910A5(schedulerJob *plugin.SchedulerJob,
env plugin.ScheduleEnv) error {
klog.V(util.LogDebugLev).Infof("enter ForceDeleteJob")
if fJob == nil || schedulerJob == nil {
return errors.New("getJobFaultRescheduleLabel fJob object or ssn or schedulerJob does not exist")
}
isSuperPod := false
vSuperPodIds := make([]string, 0)
if _, ok := schedulerJob.Annotation[util.SuperPodAnnoKey]; ok {
isSuperPod = true
vSuperPodIds = fJob.getVSuperPodIds()
}
fJob.updateSuperPodsReschdInfo(env)
fJob.judgeJobIsMasterFault(vSuperPodIds)
dpi := &deletePodInfo{
isSuperPod: isSuperPod,
ids: vSuperPodIds,
}
fJob.forceDeletePodsFor910A5(schedulerJob, env, dpi)
return nil
}
func (fJob *FaultJob) judgeJobIsMasterFault(vSuperPodIds []string) {
for _, fTask := range fJob.FaultTasks {
if fTask.NodeRankIndex != util.Rank0 {
continue
}
if fTask.IsFaultTask {
fJob.IsMasterFault = true
return
}
if fJob.PendingSessionNum >= tpPendingTimes && fJob.inTheSameTpBlock(fTask) {
klog.V(util.LogInfoLev).Infof("master pod and fault task is in the same tpBlock")
fJob.IsMasterFault = true
return
}
if fJob.PendingSessionNum >= spPendingTimes && fJob.inTheSameVSuperPod(vSuperPodIds, fTask.NodeName) {
klog.V(util.LogInfoLev).Infof("master pod and fault task is in the same vsuperpod")
fJob.IsMasterFault = true
return
}
}
fJob.IsMasterFault = false
}
func (fJob *FaultJob) getVSuperPodIds() []string {
var ids []string
for _, task := range fJob.FaultTasks {
if task.IsFaultTask {
id := fJob.getVSuperPodId(task.NodeName)
if id != "" {
ids = append(ids, id)
}
}
}
klog.V(util.LogInfoLev).Infof("getVSuperPodIds super pod vSuperPodIds:%v", ids)
return ids
}
func (fJob *FaultJob) getVSuperPodId(node string) string {
for id, superNodes := range fJob.SuperPods {
for _, superNode := range superNodes {
if superNode.Name == node {
return id
}
}
}
return ""
}
func (fJob *FaultJob) forceDeletePodsFor910A5(schedulerJob *plugin.SchedulerJob, env plugin.ScheduleEnv,
dpi *deletePodInfo) {
var waitDeleteTask = make([]FaultTask, 0)
for id, fTask := range fJob.FaultTasks {
klog.V(util.LogDebugLev).Infof("not masterFault is %v, job single rescheduling is %v ,"+
"not fault task is %v, allow upgrade is %v", !fJob.IsMasterFault,
fJob.IsJobSingleRescheduling(schedulerJob),
!fTask.IsFaultTask, fJob.allowUpgradePodRescheduling())
if fJob.skipThisTask(dpi, fTask, schedulerJob) {
klog.V(util.LogDebugLev).Infof(
"rescheduling: skip force delete task<%s> for job<%s/%s>", fTask.TaskName, fJob.JobNamespace,
fJob.JobName)
continue
}
klog.V(util.LogDebugLev).Infof(
"rescheduling: start to force delete task<%s> for job<%s/%s>", fTask.TaskName, fJob.JobNamespace,
fJob.JobName)
if fTask.NodeRankIndex == util.Rank0 {
klog.V(util.LogInfoLev).Infof("master pod will be deleted, set fJob.IsMasterFault true")
fJob.IsMasterFault = true
}
fJob.FaultTasks[id].IsSatisfiedRackAffinity = false
fJob.updateSuperPodMapInfo(env, fTask.TaskName, fTask.NodeName)
waitDeleteTask = append(waitDeleteTask, fTask)
klog.V(util.LogInfoLev).Infof("superpod delete pods:%s", fTask.TaskName)
}
fJob.deletingTasksConcurrently(waitDeleteTask, env.FrameAttr.KubeClient)
}
func (fJob *FaultJob) skipThisTask(dpi *deletePodInfo, fTask FaultTask, schedulerJob *plugin.SchedulerJob) bool {
if !fJob.allowUpgradePodRescheduling() {
return !fTask.IsFaultTask
}
if fJob.IsMasterFault {
return false
}
if res, continueProcess := fJob.processReschedulingSkipTaskforA5(fTask, schedulerJob); !continueProcess {
return res
}
return fJob.podReschedulingSkipTask(dpi, fTask, schedulerJob)
}
func (fJob *FaultJob) processReschedulingSkipTaskforA5(fTask FaultTask,
schedulerJob *plugin.SchedulerJob) (bool, bool) {
if !fJob.IsProcessReschedulingJob(schedulerJob) {
return false, true
}
if !fTask.IsFaultTask {
if fJob.PendingSessionNum < tpPendingTimes {
klog.V(util.LogInfoLev).Infof("skip because %s rescheduling is the first stage", fJob.JobName)
return true, false
}
if !fJob.inTheSameTpBlock(fTask) {
klog.V(util.LogInfoLev).Infof("skip because %s and fault task is not inTheSameTpBlock", fTask.TaskName)
return true, false
}
}
return false, false
}
func (fJob *FaultJob) IsProcessReschedulePause(sJob *plugin.SchedulerJob) bool {
return sJob.Label[util.ProcessRecoverEnable] == util.ProcessRecoverPause
}
func (fJob *FaultJob) podReschedulingSkipTask(dpi *deletePodInfo, fTask FaultTask,
schedulerJob *plugin.SchedulerJob) bool {
if fJob.IsJobSingleRescheduling(schedulerJob) && !fTask.IsFaultTask {
if !dpi.isSuperPod {
klog.V(util.LogInfoLev).Infof("skip because %s is not super pod", fJob.JobName)
return true
}
if fJob.PendingSessionNum < tpPendingTimes {
klog.V(util.LogInfoLev).Infof("skip because %s rescheduling is the first stage", fJob.JobName)
return true
}
if fJob.PendingSessionNum < spPendingTimes && !fJob.inTheSameTpBlock(fTask) {
klog.V(util.LogInfoLev).Infof("skip because %s and fault task is not inTheSameTpBlock", fTask.TaskName)
return true
}
if fJob.PendingSessionNum < pendingTimes && !fJob.inTheSameVSuperPod(dpi.ids, fTask.NodeName) {
klog.V(util.LogInfoLev).Infof("skip because %s and fault task is not inTheSameVSuperPod", fTask.TaskName)
return true
}
}
return false
}
func (fJob *FaultJob) inTheSameVSuperPod(ids []string, nodeName string) bool {
for _, v := range ids {
nodes, ok := fJob.SuperPods[v]
if !ok {
klog.V(util.LogErrorLev).Infof("superpod id does not exist in fJob.SuperPods, id: %s", v)
return false
}
for _, each := range nodes {
if each.Name == nodeName {
return true
}
}
}
return false
}
func (fJob *FaultJob) inTheSameTpBlock(fTask FaultTask) bool {
if fJob.TpBlock <= forceRackAffinityLimit {
return false
}
fTaskRankId, err := strconv.Atoi(fTask.NodeRankIndex)
if err != nil {
klog.V(util.LogErrorLev).Infof("ftask NodeRankIndex cannot be converted to int, targetRankId: %s",
fTask.NodeRankIndex)
return false
}
for _, task := range fJob.FaultTasks {
if !task.IsFaultTask {
continue
}
targetRankId, err := strconv.Atoi(task.NodeRankIndex)
if err != nil {
klog.V(util.LogErrorLev).Infof("target task NodeRankIndex cannot be converted to int, targetRankId:"+
" %s", task.NodeRankIndex)
return false
}
targetRankId = targetRankId - targetRankId%fJob.TpBlock
if fTaskRankId >= targetRankId && fTaskRankId < targetRankId+fJob.TpBlock {
return true
}
}
return false
}