Copyright(C)2020-2022. Huawei Technologies Co.,Ltd. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
Package rescheduling is using for HuaWei Ascend pin fault rescheduling.
*/
package rescheduling
import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"strings"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/ascend-volcano-plugin/common/k8s"
"volcano.sh/volcano/pkg/scheduler/plugins/ascend-volcano-plugin/common/util"
"volcano.sh/volcano/pkg/scheduler/plugins/ascend-volcano-plugin/internal/consts"
"volcano.sh/volcano/pkg/scheduler/plugins/ascend-volcano-plugin/plugin"
)
func (reScheduler *ReScheduler) setGraceOverTime(value int64) {
reScheduler.GraceDeleteTime = value
}
func (reScheduler *ReScheduler) createFaultTaskHandler(job *api.JobInfo, cardName string,
env plugin.ScheduleEnv, faultJob *FaultJob) ([]FaultTask, error) {
faultTasks := make([]FaultTask, 0)
for _, task := range job.Tasks {
faultTask := newFaultTaskDefault(task, faultJob, env)
tmpNodeRankIndex, err := faultTask.getNodeRankIndex(task)
if err != nil {
klog.V(util.LogInfoLev).Infof("getNodeRankIndex %s %s.", task.Name, util.SafePrint(err))
}
faultTask.setNodeRankIndex(tmpNodeRankIndex)
tmpUseCardName, getErr := faultTask.getUseCardName(task, cardName, env)
if getErr != nil {
klog.V(util.LogInfoLev).Infof("getUseCardName %s %s", task.Name, util.SafePrint(getErr))
}
faultTask.setUseCardName(tmpUseCardName)
err = reScheduler.setTaskCardHealthCode(&faultTask)
if err != nil {
klog.V(util.LogDebugLev).Infof("setTaskCardHealthCode task %s err %s", task.Name, util.SafePrint(err))
}
isFaultTask, healthState := reScheduler.getTaskHealthState(&faultTask, task, faultJob.SubHealthyStrategy)
klog.V(util.LogDebugLev).Infof("task %s is fault task: %v, health state: %s", task.Name, isFaultTask,
healthState)
if isFaultTask {
klog.V(util.LogWarningLev).Infof("task %s is fault task: %v, health state: %s", task.Name, isFaultTask,
healthState)
}
faultTask.setIsFaultTask(isFaultTask)
faultTask.setFaultType(healthState)
faultTask.setIsSatisfiedRackAffinity(true)
faultTasks = append(faultTasks, faultTask)
}
return faultTasks, nil
}
func (reScheduler *ReScheduler) GetRunningJobs(ssn *framework.Session) map[api.JobID]*api.JobInfo {
var myJobs = make(map[api.JobID]*api.JobInfo, util.MapInitNum)
for _, jobInfo := range ssn.Jobs {
if (jobInfo.PodGroup.Status.Phase != util.PodGroupRunning) &&
(jobInfo.PodGroup.Status.Phase != util.PodGroupUnknown) {
klog.V(util.LogWarningLev).Infof("job %s pod group is not running but %s, skip", jobInfo.Name,
jobInfo.PodGroup.Status.Phase)
continue
}
schedulerJob, ok := reScheduler.Jobs[jobInfo.UID]
if !ok || schedulerJob.NPUJob == nil {
klog.V(util.LogWarningLev).Infof("job %s not in session, skip", jobInfo.UID)
continue
}
if schedulerJob.ReqNPUNum == 0 && schedulerJob.NPUJob.IsNPUJob() {
klog.V(util.LogWarningLev).Infof("job %s requires npu %d is illegal, skip",
schedulerJob.Name, schedulerJob.ReqNPUNum)
continue
}
myJobs[jobInfo.UID] = jobInfo
}
return myJobs
}
func (reScheduler *ReScheduler) updateNewFaultJobAttr(
faultJob *FaultJob, jobInfo *api.JobInfo, env plugin.ScheduleEnv) *FaultJob {
npuJob := reScheduler.Jobs[faultJob.JobUID]
faultJob.setCommonAttrFromScheduleJob(npuJob)
faultJob.setJobSubHealthyStrategy()
faultJob.setReScheduleLimit()
faultJob.setFaultRetryTimeOfJob()
tmpReScheduleKey := faultJob.GetJobFaultRescheduleLabel()
faultJob.setJobFaultReScheduleLabel(tmpReScheduleKey)
klog.V(util.LogDebugLev).Infof("job %s set rescheduleLabel %v", jobInfo.Name, tmpReScheduleKey)
if tmpReScheduleKey == JobOffRescheduleLabelValue {
klog.V(util.LogInfoLev).Infof("job %s rescheduleLabel off, skip rescheduling.", jobInfo.Name)
return faultJob
}
npuName := util.GetNpuNameFromJobRequire(npuJob.ReqNPUName)
tmpFaultTasks, err := reScheduler.createFaultTaskHandler(jobInfo, npuName, env, faultJob)
if err != nil {
klog.V(util.LogInfoLev).Infof("job %s createFaultTaskHandler failed: %s", jobInfo.Name, util.SafePrint(err))
}
faultJob.setFaultTasks(tmpFaultTasks)
tmpIsFaultJob := faultJob.getIsFaultJob()
klog.V(util.LogDebugLev).Infof("job %s if fault job: %v", faultJob.JobName, tmpIsFaultJob)
faultJob.setIsFaultJob(tmpIsFaultJob)
faultJob.updateFaultJobInfo(npuJob, reScheduler, env)
faultJob.setIsSubHealthFault()
klog.V(util.LogDebugLev).Infof("job %s fault types: %v", faultJob.JobName, faultJob.FaultTypes)
_, ok := reScheduler.JobRemainRetryTimes[faultJob.JobUID]
if !ok {
if reScheduler.JobRemainRetryTimes == nil {
reScheduler.JobRemainRetryTimes = make(map[api.JobID]*RemainRetryTimes)
}
reScheduler.JobRemainRetryTimes[faultJob.JobUID] = &RemainRetryTimes{
UUID: faultJob.UUID,
Times: faultJob.FaultRetryTimes,
}
}
return faultJob
}
func (fJob *FaultJob) updateFaultJobInfo(npuJob plugin.SchedulerJob, reScheduler *ReScheduler, env plugin.ScheduleEnv) {
if npuJob.SuperPods == nil {
npuJob.SuperPods = rebuildScheduledSuperPods(npuJob, env)
reScheduler.Jobs[fJob.JobUID] = npuJob
}
fJob.SuperPods = npuJob.SuperPods
if !fJob.IsFaultJob {
return
}
defer func() { reScheduler.Jobs[fJob.JobUID] = npuJob }()
replaceNodes := make(map[string][]string, 1)
for _, fTask := range fJob.FaultTasks {
if fTask.IsFaultTask {
fJob.FaultTypes = append(fJob.FaultTypes, fTask.faultType)
}
if fTask.IsFaultTask && !fTask.IsHotSwitchDelete {
npuJob.SuperPods = nil
continue
}
if _, ok := fTask.Annotations[consts.BackupNewPodNameKey]; ok {
if len(replaceNodes[fTask.TaskName]) == 0 {
replaceNodes[fTask.TaskName] = make([]string, replaceNodesLen)
}
if superPodRank, ok := fTask.Annotations[util.SuperPodRankKey]; ok {
replaceNodes[fTask.TaskName][superPodRankIndex] = superPodRank
}
replaceNodes[fTask.TaskName][originNodeNameIndex] = fTask.NodeName
}
if sourcePod, ok := fTask.Annotations[consts.BackupSourcePodNameKey]; ok {
if len(replaceNodes[sourcePod]) == 0 {
replaceNodes[sourcePod] = make([]string, replaceNodesLen)
}
replaceNodes[sourcePod][newNodeNameIndex] = fTask.NodeName
}
}
if npuJob.SuperPods == nil {
return
}
for _, item := range replaceNodes {
for index, sn := range npuJob.SuperPods[item[superPodRankIndex]] {
if sn.Name == item[originNodeNameIndex] {
sn.Name = item[newNodeNameIndex]
npuJob.SuperPods[item[superPodRankIndex]][index] = sn
}
}
}
}
func (reScheduler *ReScheduler) AddFaultJobWithSession(
jobs map[api.JobID]*api.JobInfo, env plugin.ScheduleEnv) error {
klog.V(util.LogDebugLev).Info("enter AddFaultJobWithSession ... ")
defer klog.V(util.LogDebugLev).Info("leave AddFaultJobWithSession ... ")
if reScheduler == nil {
klog.V(util.LogDebugLev).Infof("AddFaultJobWithSession: %s, nil reScheduler or job", util.ArgumentError)
return errors.New(util.ArgumentError)
}
klog.V(util.LogDebugLev).Infof("ReSchedulerCache fault jobs before add: %v",
string(util.MarshalData(reScheduler.FaultJobs)))
nowTime := time.Now().Unix()
for _, jobInfo := range jobs {
klog.V(util.LogDebugLev).Infof("ReSchedulerCache considering job %s", jobInfo.Name)
if _, ok := reScheduler.FaultJobs[jobInfo.UID]; ok {
continue
}
klog.V(util.LogDebugLev).Infof("Add job %s to cache", jobInfo.Name)
faultJob := newFaultJobDefault(jobInfo, nowTime)
faultJob = reScheduler.updateNewFaultJobAttr(faultJob, jobInfo, env)
reScheduler.FaultJobs[jobInfo.UID] = faultJob
if faultJob.IsFaultJob {
klog.V(util.LogInfoLev).Infof("job %s is fault job, add to fault jobs", faultJob.JobName)
}
}
reScheduler.initSuperPodInfo(env)
klog.V(util.LogDebugLev).Infof("ReSchedulerCache fault jobs after add: %v",
string(util.MarshalData(reScheduler.FaultJobs)))
return nil
}
func (reScheduler *ReScheduler) initSuperPodInfo(env plugin.ScheduleEnv) {
superPodReschdInfo := make(map[api.JobID]map[string][]plugin.SuperNode)
superPodFaultTaskNodes := make(map[api.JobID][]string)
superPodMapFaultTaskNodes := make(map[api.JobID]map[string]string)
for _, fJob := range reScheduler.FaultJobs {
if value, ok := env.SuperPodInfo.SuperPodReschdInfo[fJob.JobUID]; ok {
superPodReschdInfo[fJob.JobUID] = value
}
if value, ok := env.SuperPodInfo.SuperPodFaultTaskNodes[fJob.JobUID]; ok {
superPodFaultTaskNodes[fJob.JobUID] = value
}
if value, ok := env.SuperPodInfo.SuperPodMapFaultTaskNodes[fJob.JobUID]; ok {
superPodMapFaultTaskNodes[fJob.JobUID] = value
}
}
env.SuperPodInfo.SuperPodReschdInfo = superPodReschdInfo
env.SuperPodInfo.SuperPodFaultTaskNodes = superPodFaultTaskNodes
env.SuperPodInfo.SuperPodMapFaultTaskNodes = superPodMapFaultTaskNodes
}
func GetTaskRestartReason(reasonList []FaultReasonList) string {
str, err := json.Marshal(reasonList)
if err != nil {
klog.V(util.LogInfoLev).Infof("convertToReSchedulerJobsMapFromCM marshal: %s.", util.SafePrint(err))
return ""
}
return string(str)
}
func (reScheduler ReScheduler) getGraceDeleteFaultJobs() []*FaultJob {
var graceDeleteJobs []*FaultJob
for _, fJob := range reScheduler.FaultJobs {
if !fJob.IsFaultJob || fJob.ReScheduleKey != JobGraceRescheduleLabelValue {
continue
}
graceDeleteJobs = append(graceDeleteJobs, fJob)
}
return graceDeleteJobs
}
func (reScheduler *ReScheduler) GetNeedForceDeleteDelayingNPUJobs(
schedulerJobs map[api.JobID]plugin.SchedulerJob, ssn *framework.Session) ([]plugin.SchedulerJob, error) {
klog.V(util.LogDebugLev).Infof("enter GetNeedForceDeleteDelayingNPUJobs ... ")
defer klog.V(util.LogDebugLev).Infof("leave GetNeedForceDeleteDelayingNPUJobs ... ")
if reScheduler == nil || len(schedulerJobs) == 0 || ssn == nil {
klog.V(util.LogDebugLev).Infof("GetNeedForceDeleteDelayingNPUJobs: %s, "+
"nil reScheduler or schedulerJobs or session", util.ArgumentError)
return nil, errors.New(util.ArgumentError)
}
forceJobs := make([]plugin.SchedulerJob, 0)
graceDeleteFaultJobs := reScheduler.getGraceDeleteFaultJobs()
for _, fJob := range graceDeleteFaultJobs {
jobInfo := fJob.jobInfoInSession(ssn.Jobs)
if jobInfo == nil {
klog.V(util.LogDebugLev).Infof(
"GetNeedForceDeleteDelayingNPUJobs %v not in ssn.Jobs.", fJob.JobName)
continue
}
if fJob.isJobGraceDeleteSuccess(jobInfo, false) {
continue
}
if !reScheduler.isDelayingJobTimeout(fJob) {
continue
}
klog.V(util.LogWarningLev).Infof("grace delete job %s is time out for force delete.", fJob.JobName)
schedulerJob, ok := schedulerJobs[fJob.JobUID]
if !ok {
continue
}
forceJobs = append(forceJobs, schedulerJob)
}
if len(forceJobs) == 0 {
klog.V(util.LogInfoLev).Infof("GetNeedForceDeleteDelayingNPUJobs get nil jobs.")
return nil, errors.New(getNoneJobsErr)
}
return forceJobs, nil
}
func (reScheduler *ReScheduler) isDelayingJobTimeout(fJob *FaultJob) bool {
nowTime := time.Now().Unix()
createTime := fJob.UpdateTime
klog.V(util.LogDebugLev).Infof("isDelayingJobTimeOut now: %v create: %v.", nowTime, createTime)
if nowTime-createTime > reScheduler.GraceDeleteTime {
klog.V(util.LogInfoLev).Infof("Time out: %v > %v", nowTime-createTime, reScheduler.GraceDeleteTime)
return true
}
return false
}
func (reScheduler *ReScheduler) synCacheFaultJobWithSession(ssn *framework.Session) {
klog.V(util.LogDebugLev).Infof("enter synCacheFaultJobWithSession...")
defer klog.V(util.LogDebugLev).Infof("leave synCacheFaultJobWithSession...")
updatedFaultJobs := make(map[api.JobID]*FaultJob)
nowTime := time.Now().Unix()
for jobId, faultJob := range reScheduler.FaultJobs {
if nowTime-faultJob.UpdateTime > maxIntervalTime+reScheduler.GraceDeleteTime {
klog.V(util.LogWarningLev).Infof("rescheduling: delete %s from CM for overTime %v => %v.",
faultJob.JobName, nowTime, faultJob.UpdateTime)
continue
}
jobInfo := faultJob.jobInfoInSession(ssn.Jobs)
if jobInfo == nil {
klog.V(util.LogWarningLev).Infof("rescheduling: faultJob name: %s not in session", faultJob.JobName)
continue
}
sJob := reScheduler.Jobs[faultJob.JobUID]
is910A5 := is910A5SuperPodJob(&sJob)
if faultJob.isJobGraceDeleteSuccess(jobInfo, is910A5) {
reScheduler.updateFaultJobWhenGraceDeleteSuccess(jobInfo, faultJob, is910A5)
if plugin.GetJobInfoAllocatedTaskNum(jobInfo) >= faultJob.MinAvailable {
faultJob.resetGraceExitCode(ssn.KubeClient())
klog.V(util.LogInfoLev).Infof(
"rescheduling: skip job <%s/%s> because the job is rescheduled",
faultJob.JobNamespace, faultJob.JobName)
continue
}
}
if faultJob.ElasticScheduling == JobOnElasticScheduling {
klog.V(util.LogDebugLev).Infof(
"rescheduling: skip restart job <%s/%s> because the job is on elastic scheduling",
faultJob.JobNamespace, faultJob.JobName)
continue
}
if !faultJob.DeleteExecutedFlag {
reScheduler.updateJobHealthCode(faultJob)
faultJob.updateTaskPodUid(jobInfo)
}
reScheduler.setFaultTaskUseNodeLinkDownTime(faultJob)
faultJob.setCommonAttrFromScheduleJob(reScheduler.Jobs[jobInfo.UID])
klog.V(util.LogDebugLev).Infof(
"rescheduling: synCacheFaultJobWithSession add job <%s/%s>", faultJob.JobNamespace, faultJob.JobName)
updatedFaultJobs[jobId] = faultJob
}
reScheduler.setFaultJobs(updatedFaultJobs)
klog.V(util.LogDebugLev).Infof("ReSchedulerCache fault jobs after sync: %v",
string(util.MarshalData(reScheduler.FaultJobs)))
}
func (reScheduler *ReScheduler) setFaultTaskUseNodeLinkDownTime(fJob *FaultJob) {
for _, fTask := range fJob.FaultTasks {
if !fTask.IsFaultTask || fTask.faultType != util.RelationFault {
continue
}
fNode, ok := reScheduler.FaultNodes[fTask.NodeName]
if !ok {
continue
}
hasL1LinkDown := false
for _, deviceFault := range fNode.FaultDeviceList {
if deviceFault.FaultLevel == NotHandleFault && deviceFault.FaultCode == linkDownFaultCode {
hasL1LinkDown = true
}
}
if !hasL1LinkDown {
continue
}
fNode.LinkDownTime = fTask.FaultTime
}
}
func (reScheduler *ReScheduler) singlePodReschedulingUpgrade(fJob *FaultJob) {
if fJob.Labels[util.SinglePodTag] != util.EnableFunc {
return
}
if !fJob.allowUpgradePodRescheduling() ||
fJob.Labels[util.ProcessRecoverEnable] == util.EnableFunc {
return
}
fJob.PendingSessionNum++
job, ok := reScheduler.Jobs[fJob.JobUID]
if ok && (job.IsSuperPodJob() || job.IsMultiLevelJob()) && fJob.PendingSessionNum == spPendingTimes {
fJob.DeleteExecutedFlag = false
}
if fJob.PendingSessionNum == pendingTimes {
fJob.DeleteExecutedFlag = false
}
}
func (reScheduler *ReScheduler) SyncJobRemainRetryTimes(ssn *framework.Session) {
klog.V(util.LogDebugLev).Info("enter SyncJobRemainRetryTimes...")
defer klog.V(util.LogDebugLev).Info("leave SyncJobRemainRetryTimes...")
if reScheduler == nil {
klog.V(util.LogErrorLev).Infof("synCacheFaultJobWithSession: %s, nil reScheduler", util.ArgumentError)
return
}
klog.V(util.LogDebugLev).Infof("job remain retry times, sync before: %v", reScheduler.JobRemainRetryTimes)
defer klog.V(util.LogDebugLev).Infof("job remain retry times, sync after: %v", reScheduler.JobRemainRetryTimes)
newInfo := make(map[api.JobID]*RemainRetryTimes)
for jobID, rt := range reScheduler.JobRemainRetryTimes {
job, ok := ssn.Jobs[jobID]
if !ok {
klog.V(util.LogWarningLev).Infof("job<%s> is not session, remain retry times will be delete", jobID)
continue
}
if util.UuidOfJob(job) != rt.UUID {
continue
}
newInfo[jobID] = rt
}
reScheduler.JobRemainRetryTimes = newInfo
}
func (reScheduler *ReScheduler) SyncJobRecentRescheduleReason(ssn *framework.Session) {
klog.V(util.LogDebugLev).Info("enter SyncJobRecentRescheduleReason...")
defer klog.V(util.LogDebugLev).Info("leave SyncJobRecentRescheduleReason...")
if reScheduler == nil {
klog.V(util.LogErrorLev).Infof("SyncJobRecentRescheduleReason: %s, nil reScheduler", util.ArgumentError)
return
}
klog.V(util.LogDebugLev).Infof("job reschedule records, sync before: %v", reScheduler.JobRecentRescheduleRecords)
defer klog.V(util.LogDebugLev).Infof("job reschedule records, sync after: %v",
reScheduler.JobRecentRescheduleRecords)
newInfo := make(map[api.JobID]*RescheduleReason)
for jobID, rescheduleRecord := range reScheduler.JobRecentRescheduleRecords {
if _, ok := ssn.Jobs[jobID]; !ok {
klog.V(util.LogWarningLev).Infof("job<%s> is not session, job reschedule records will delete it", jobID)
continue
}
newInfo[jobID] = rescheduleRecord
}
reScheduler.JobRecentRescheduleRecords = newInfo
}
func (reScheduler *ReScheduler) AddFaultNodeWithSession() {
klog.V(util.LogDebugLev).Infof("enter AddFaultNodeWithSession ...")
defer klog.V(util.LogDebugLev).Infof("leave AddFaultNodeWithSession ...")
if reScheduler == nil {
klog.V(util.LogErrorLev).Infof("AddFaultNodeWithSession: %s, nil reScheduler", util.ArgumentError)
return
}
nowTime := time.Now().Unix()
tmpFaultNodes := make(map[string]*FaultNode, len(reScheduler.Nodes))
addfaultNodeErrors := util.NewErrorCollector("AddFaultNodeWithSession", util.DefaultPrintLimit)
for name, npuNode := range reScheduler.Nodes {
klog.V(util.LogDebugLev).Infof("Adding node %s to reScheduler cache", name)
hasNpuRes := util.IsMapHasNPUResource(npuNode.Capability, util.HwPreName)
npuName := ""
if hasNpuRes {
chipKind, nameErr := npuNode.GetChipKindFromNpuNode()
if nameErr != nil {
addfaultNodeErrors.Add(name, fmt.Errorf("getChipKindFromNpuNode: %s", nameErr))
continue
}
npuName = util.HwPreName + chipKind
}
faultNode := newFaultNodeDefault(npuNode.Name, nowTime)
faultNode.IsNpuNode = hasNpuRes
faultNode.NPUName = npuName
faultNode.SuperPodID = npuNode.SuperPodID
faultNode.RackID = npuNode.RackID
faultNode.updateFaultNodesFromDeviceInfo(&npuNode)
faultNode.updateFaultNodesAttr(&npuNode)
tmpFaultNodes[name] = faultNode
if !faultNode.NodeDEnable {
addfaultNodeErrors.Add(name, errors.New("isNodeDEnabled: the node label of nodeDEnable=on is not set"))
}
}
addfaultNodeErrors.Print()
reScheduler.setFaultNodes(tmpFaultNodes)
}
func (reScheduler *ReScheduler) RestartNeedForceDeleteJobs(ssn *framework.Session, env plugin.ScheduleEnv) error {
klog.V(util.LogDebugLev).Infof("enter RestartNeedForceDeleteJobs...")
defer klog.V(util.LogDebugLev).Infof("leave RestartNeedForceDeleteJobs...")
if reScheduler == nil || ssn == nil {
klog.V(util.LogErrorLev).Infof("RestartNeedForceDeleteJobs failed: %s, nil reScheduler or session",
util.ArgumentError)
return errors.New(util.ArgumentError)
}
needDeleteNPUJobs, err := reScheduler.GetNeedForceDeleteDelayingNPUJobs(reScheduler.Jobs, ssn)
if err != nil {
if err.Error() == getNoneJobsErr {
return nil
}
return err
}
klog.V(util.LogDebugLev).Infof("GetNeedForceDeleteDelayingNPUJobs: %#v", needDeleteNPUJobs)
for _, schedulerJob := range needDeleteNPUJobs {
for _, faultJob := range reScheduler.FaultJobs {
if schedulerJob.Name != faultJob.JobUID {
continue
}
klog.V(util.LogWarningLev).Infof("grace delete job %s is timeout,force delete.", schedulerJob.Name)
forceDeleteJob(schedulerJob, faultJob, ssn, env)
}
}
return nil
}
func forceDeleteJob(schedulerJob plugin.SchedulerJob, faultJob *FaultJob, ssn *framework.Session,
env plugin.ScheduleEnv) {
if is910A5SuperPodJob(&schedulerJob) {
if deleteErr := faultJob.ForceDeleteJobFor910A5(&schedulerJob, env); deleteErr != nil {
klog.V(util.LogErrorLev).Infof("%s ForceDeleteJob: %s", schedulerJob.Name, util.SafePrint(deleteErr))
}
} else {
if deleteErr := faultJob.ForceDeleteJob(&schedulerJob, env); deleteErr != nil {
klog.V(util.LogErrorLev).Infof("%s ForceDeleteJob: %s", schedulerJob.Name, util.SafePrint(deleteErr))
}
}
}
func (reScheduler *ReScheduler) RestartFaultJobs(ssn *framework.Session, env plugin.ScheduleEnv) error {
klog.V(util.LogDebugLev).Infof("enter RestartFaultJobs...")
defer klog.V(util.LogDebugLev).Infof("leave RestartFaultJobs...")
if reScheduler == nil || ssn == nil {
klog.V(util.LogErrorLev).Infof("RestartFaultJobs failed: %s, nil reScheduler or nil session",
util.ArgumentError)
return errors.New(util.ArgumentError)
}
restartFaultJobs := reScheduler.getJobsToBeRestarted(reScheduler.getRealFaultJobs())
newCacheJobs := reScheduler.getNewCacheJobs(restartFaultJobs)
klog.V(util.LogDebugLev).Infof("Jobs to be restarted: %v", string(util.MarshalData(restartFaultJobs)))
for _, restartFaultJob := range restartFaultJobs {
schedulerJob, ok := reScheduler.Jobs[restartFaultJob.JobUID]
if !ok {
klog.V(util.LogWarningLev).Infof("restartFaultJob %s not in session, has already been deleted",
schedulerJob.Name)
continue
}
reScheduler.doRestartJob(ssn, env, restartFaultJob, schedulerJob)
newCacheJobs[restartFaultJob.JobUID] = restartFaultJob
}
reScheduler.setFaultJobs(newCacheJobs)
return nil
}
func (reScheduler *ReScheduler) doRestartJob(ssn *framework.Session, env plugin.ScheduleEnv,
restartFaultJob *FaultJob, schedulerJob plugin.SchedulerJob) {
klog.V(util.LogInfoLev).Infof("%s need restart.", restartFaultJob.JobName)
if restartErr := restartFaultJob.restartSingleFaultJob(
ssn, reScheduler, &schedulerJob, env); restartErr != nil {
klog.V(util.LogErrorLev).Infof("RestartJob %s, err: %s.", schedulerJob.Name, util.SafePrint(restartErr))
} else {
for i, fTask := range restartFaultJob.FaultTasks {
if !fTask.IsFaultTask || fTask.faultType != util.RelationFault {
continue
}
restartFaultJob.FaultTasks[i].FaultTime = time.Now().Unix()
}
restartFaultJob.recordFaultJobsToLogs()
reScheduler.JobRecentRescheduleRecords[restartFaultJob.JobUID] =
updateRescheduleReason(reScheduler.JobRecentRescheduleRecords[restartFaultJob.JobUID], restartFaultJob)
restartFaultJob.DeleteExecutedFlag = true
if restartFaultJob.faultReason == PodFailed {
reScheduler.JobRemainRetryTimes[restartFaultJob.JobUID].Times -= 1
klog.V(util.LogWarningLev).Infof("job<%s> restart success, "+
"remain retry times reduce 1", restartFaultJob.JobUID)
}
klog.V(util.LogWarningLev).Infof("delete %s pod execution success, set flag true", schedulerJob.Name)
}
}
func updateRescheduleReason(Reasons *RescheduleReason, fJob *FaultJob) *RescheduleReason {
if fJob == nil {
klog.V(util.LogErrorLev).Infof("cannot updateRescheduleReason cause nil FaultJob, err:%s", util.ArgumentError)
return nil
}
if Reasons == nil {
Reasons = &RescheduleReason{
JobID: fJob.JobUID,
}
}
var rescheduleRecord RescheduleRecord
rescheduleInfo := convertFaultTaskToRecords(fJob)
now := time.Now()
rescheduleRecord.ReasonOfTask = rescheduleInfo
rescheduleRecord.RescheduleTimeStamp = now.Unix()
rescheduleRecord.LogFileFormatTime = now.Format("I0102 15:04:05")
Reasons.RescheduleRecords = append([]RescheduleRecord{rescheduleRecord}, Reasons.RescheduleRecords...)
Reasons.TotalRescheduleTimes += 1
if len(Reasons.RescheduleRecords) > MaxRescheduleRecordsNum {
Reasons.RescheduleRecords = Reasons.RescheduleRecords[:MaxRescheduleRecordsNum]
}
return Reasons
}
func convertFaultTaskToRecords(fJob *FaultJob) []RescheduleTaskReason {
rescheduleInfo := make([]RescheduleTaskReason, 0)
if fJob == nil {
klog.V(util.LogErrorLev).Infof("cannot convertFaultTaskToRecords cause nil FaultJob, "+
"err:%s", util.ArgumentError)
return rescheduleInfo
}
for _, fTask := range fJob.FaultTasks {
if !fTask.IsFaultTask {
continue
}
reasonInfo := RescheduleTaskReason{
RescheduleReason: fTask.faultType,
PodName: fTask.TaskName,
NodeName: fTask.NodeName,
NodeRankIndex: fTask.NodeRankIndex,
}
if len(rescheduleInfo) >= MaxRescheduleRecordsNum {
klog.V(util.LogWarningLev).Infof(
"there were more than %d task is fault task, will not record them "+
"into configmap", MaxRescheduleRecordsNum)
break
}
rescheduleInfo = append(rescheduleInfo, reasonInfo)
}
return rescheduleInfo
}
func updateResetConfigMapWithGraceExit(client kubernetes.Interface, name, nameSpace string, exitCode int) {
cm, err := k8s.GetConfigMapWithRetry(client, nameSpace, name)
if err != nil {
klog.V(util.LogWarningLev).Infof("get reset cm err by:%s", err)
return
}
cmData, ok := cm.Data[plugin.ResetInfoCMDataKey]
if !ok {
klog.V(util.LogWarningLev).Infof("get reset cm err by %s is not exist", plugin.ResetInfoCMDataKey)
return
}
resetCm := plugin.TaskResetInfo{}
err = json.Unmarshal([]byte(cmData), &resetCm)
if err != nil {
klog.V(util.LogWarningLev).Infof("get reset cm unmarshal err:%s", err)
return
}
resetCm.GracefulExit = exitCode
checkCode := util.MakeDataHash(resetCm)
str, err := json.Marshal(resetCm)
if err != nil {
klog.V(util.LogWarningLev).Infof("get reset cm marshal err:%s", err)
return
}
upCm := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: nameSpace,
Labels: map[string]string{"reset": "true"},
},
Data: map[string]string{
CmCheckCode: checkCode,
plugin.ResetInfoCMDataKey: string(str),
},
}
_, err = client.CoreV1().ConfigMaps(nameSpace).
Update(context.TODO(), upCm, metav1.UpdateOptions{})
if err != nil {
klog.V(util.LogWarningLev).Infof("set update reset cm err:%s", err)
return
}
klog.V(util.LogInfoLev).Infof("set update reset cm<%s/%s> success, data: %v", upCm.Namespace, upCm.Name,
upCm.Data)
}
func (reScheduler *ReScheduler) getNewCacheJobs(restartFaultJobs map[api.JobID]*FaultJob) map[api.JobID]*FaultJob {
newCacheJobs := make(map[api.JobID]*FaultJob)
for _, fJob := range reScheduler.FaultJobs {
if _, ok := restartFaultJobs[fJob.JobUID]; ok {
continue
}
newCacheJobs[fJob.JobUID] = fJob
}
return newCacheJobs
}
func (reScheduler *ReScheduler) reduceForSubHealthyNodes(smp map[string]float64) {
if smp == nil {
return
}
for nodeName, score := range smp {
fNode, exist := reScheduler.FaultNodes[nodeName]
if !exist || (!fNode.HasCardSubHealthFault && !fNode.HasSwitchSubHealthFault) {
continue
}
smp[nodeName] = math.Max(0.0, score-util.AffScore1)
}
}
func (reScheduler *ReScheduler) ScoreBestNPUNodes(task *api.TaskInfo, scoreMap map[string]float64) {
if reScheduler == nil || task == nil || len(scoreMap) == 0 {
klog.V(util.LogErrorLev).Infof("ScoreBestNPUNodes: %s, nil reScheduler or task or scoreMap",
util.ArgumentError)
return
}
klog.V(util.LogDebugLev).Infof("enter rescheduling ScoreBestNPUNodes %s...", task.Name)
klog.V(util.LogDebugLev).Infof("node score map before add rescheduling weights %#v", scoreMap)
defer klog.V(util.LogDebugLev).Infof("leave rescheduling ScoreBestNPUNodes ...")
reScheduler.reduceForSubHealthyNodes(scoreMap)
fJob := reScheduler.FaultJobs[task.Job]
if fJob == nil {
klog.V(util.LogInfoLev).Infof("task %s is not in rescheduler cache", task.Name)
return
}
if !fJob.IsFaultJob {
klog.V(util.LogDebugLev).Infof("task %s belongs to job %s which is not a fault job", task.Name, fJob.JobName)
return
}
reScheduler.reduceScoreForLastFaultNode(fJob, scoreMap)
klog.V(util.LogDebugLev).Infof("node score map after reduce rescheduling weights %#v", scoreMap)
return
}
func (reScheduler *ReScheduler) UseAnnotation(task *api.TaskInfo) {
if reScheduler == nil || task == nil {
klog.V(util.LogErrorLev).Infof("UseAnnotation: %s, nil reScheduler or task",
util.ArgumentError)
return
}
klog.V(util.LogDebugLev).Infof("enter rescheduling UseAnnotation %s...", task.Name)
defer klog.V(util.LogDebugLev).Infof("enter rescheduling UseAnnotation %s...", task.Name)
vcJob, ok := reScheduler.Jobs[task.Job]
if !ok {
return
}
fJob, ok := reScheduler.FaultJobs[task.Job]
if !ok {
return
}
if vcJob.SchedulingTaskNum == len(vcJob.Tasks) {
return
}
for _, fTask := range fJob.FaultTasks {
if fTask.TaskName == task.Name && fTask.TaskNamespace == task.Namespace && fTask.NodeName == task.NodeName {
task.Pod.Annotations[rescheduleInPlaceKey] = rescheduleInPlaceValue
return
}
}
}
func (reScheduler *ReScheduler) reduceScoreForLastFaultNode(faultJob *FaultJob, scoreMap map[string]float64) {
if scoreMap == nil {
klog.V(util.LogWarningLev).Infof("reduceScoreForLastFaultNode: scoreMap is nil.")
return
}
faultNodeNames := reScheduler.getFaultNodeNameByFaultJob(faultJob)
for _, faultNodeName := range faultNodeNames {
if score, ok := scoreMap[faultNodeName]; ok {
klog.V(util.LogDebugLev).Infof("fault node<%s> previous used score is reduce", faultNodeName)
score -= util.AffScore8 * util.AffScore8
if score < 0 {
score = 0
}
scoreMap[faultNodeName] = score
}
}
}
func (reScheduler *ReScheduler) CheckNodeNPUByTask(task *api.TaskInfo, vcNode *plugin.NPUNode) error {
klog.V(util.LogDebugLev).Infof("enter rescheduling CheckNodeNPUByTask ...(%s, %s)", task.Name, vcNode.Name)
defer klog.V(util.LogDebugLev).Infof("leave rescheduling CheckNodeNPUByTask ...(%s, %s)",
task.Name, vcNode.Name)
if vcNode == nil {
return fmt.Errorf("node is not a vc node")
}
if err := reScheduler.checkNodeCurNodeIsFault(vcNode, task); err != nil {
return err
}
klog.V(util.LogDebugLev).Infof("CheckNodeNPUByTask node %s passed rescheduling predicate for task %s",
vcNode.Name, task.Name)
return nil
}
func (reScheduler *ReScheduler) checkNodeCurNodeIsFault(vcNode *plugin.NPUNode, task *api.TaskInfo) error {
if reScheduler == nil {
return nil
}
schedulerJob, ok := reScheduler.Jobs[task.Job]
if !ok {
return fmt.Errorf("task corresponding job not in session")
}
fNode, exist := reScheduler.FaultNodes[vcNode.Name]
if !exist {
return fmt.Errorf("node corresponding not in session")
}
if fNode.NodeHealthState == NodeUnhealthy {
return fmt.Errorf("node is unhealthy")
}
nodeHealthyStatusByNodeD := vcNode.Annotation[util.NodeHealthyStatusKey]
if nodeHealthyStatusByNodeD == util.PreSeparateFaultCode {
return fmt.Errorf("node health status is %s", nodeHealthyStatusByNodeD)
}
if !reScheduler.isJobCanAssignToSubHealthNode(schedulerJob.SubHealthyStrategy,
fNode.HasCardSubHealthFault || fNode.HasSwitchSubHealthFault) {
return fmt.Errorf("NodePredicate failed, cardSubHealthy=%v and"+
"switchSubHealthy=%v, but sub-healthy strategy is %v", fNode.HasCardSubHealthFault,
fNode.HasSwitchSubHealthFault, schedulerJob.SubHealthyStrategy)
}
if fNode.LinkDownTime == 0 {
klog.V(util.LogDebugLev).Infof("node %s is not fault node, check success", vcNode.Name)
return nil
}
if time.Now().Unix()-fNode.LinkDownTime < linkDownFaultTimeout {
klog.V(util.LogWarningLev).Infof("the node is fault node, node name=%s", vcNode.Name)
networkUnhealthyCardName := fmt.Sprintf("%s-%s", fNode.NPUName, CardNetworkUnhealthy)
k := vcNode.Annotation[networkUnhealthyCardName]
l1LinkCards := fNode.getL1LinkDownCards()
if len(l1LinkCards) == 0 {
return nil
}
if k == "" {
vcNode.Annotation[networkUnhealthyCardName] = strings.Join(l1LinkCards, ",")
} else {
vcNode.Annotation[networkUnhealthyCardName] = strings.Join(l1LinkCards, ",") + "," + k
}
}
klog.V(util.LogDebugLev).Infof("node %s is not fault node, check success", vcNode.Name)
return nil
}
func (reScheduler *ReScheduler) isJobCanAssignToSubHealthNode(jobSubHealthStrategy string, nodeSubHealth bool) bool {
if nodeSubHealth && jobSubHealthStrategy != util.SubHealthyIgnore {
return false
}
return true
}
func (reScheduler ReScheduler) getFaultNodeNameByFaultJob(faultJob *FaultJob) []string {
faultNodeNames := make([]string, 0)
for _, fTask := range faultJob.FaultTasks {
if fTask.IsFaultTask {
faultNodeNames = append(faultNodeNames, fTask.NodeName)
}
}
return faultNodeNames
}
func (reScheduler ReScheduler) setTaskCardHealthCode(fTask *FaultTask) error {
klog.V(util.LogDebugLev).Infof("task %s setTaskCardHealthCode", fTask.TaskName)
reasonList := make([]FaultReasonList, 0)
if fTask.NodeName == "" {
fTask.Reason = reasonList
return fmt.Errorf("setTaskCardHealthCode fTask %s use node is nil", fTask.TaskName)
}
for _, fNode := range reScheduler.FaultNodes {
if fNode.NodeName != fTask.NodeName {
continue
}
if fNode.NodeHealthState == NodeUnhealthy && fNode.IsNpuNode {
var reason FaultReasonList
reason.NodeName = fNode.NodeName
reason.FaultType = NodeUnhealthy
reason.FaultCode = NodeFaultCode
reason.FaultLevel = PreSeparateNPU
reason.FaultHandling = PreSeparateNPU
reason.LargeModelFaultLevel = PreSeparateNPU
reasonList = append(reasonList, reason)
}
fTask.HasSubHealthFault = fNode.HasSwitchSubHealthFault || fTask.RelationFault == util.SubHealthFaultStrategy
tmpReason := setTaskFaultReasonByFaultNode(fTask, fNode)
reasonList = append(reasonList, tmpReason...)
break
}
if fTask.IsSoftwareFault {
reasonList = append(reasonList, getTaskSoftwareFaultReason(fTask))
}
fTask.Reason = reasonList
return nil
}
func getTaskSoftwareFaultReason(fTask *FaultTask) FaultReasonList {
return FaultReasonList{
NodeName: fTask.NodeName,
TaskName: fTask.TaskName,
FaultRankList: fTask.initFaultRankIndex(),
}
}
func setTaskFaultReasonByFaultNode(fTask *FaultTask, fNode *FaultNode) []FaultReasonList {
reasonList := make([]FaultReasonList, 0)
for _, cardName := range fTask.UseCardName {
for _, fCard := range fNode.FaultDeviceList {
if cardName != fCard.NPUName || fCard.FaultHandling == NotHandleFault {
continue
}
if fCard.FaultHandling == SubHealthFault {
fTask.HasSubHealthFault = true
}
var reason FaultReasonList
reason.NodeName = fNode.NodeName
reason.TaskName = fTask.TaskName
reason.FaultRankList = fTask.initFaultRankIndex()
reason.FaultDeviceList = fCard
reasonList = append(reasonList, reason)
}
}
return reasonList
}
func (reScheduler ReScheduler) updateJobHealthCode(fJob *FaultJob) {
if fJob == nil {
return
}
setCardHealthCodeErrors := util.NewErrorCollector("updateJobHealthCode", util.DefaultPrintLimit)
for index := range fJob.FaultTasks {
if err := reScheduler.setTaskCardHealthCode(&fJob.FaultTasks[index]); err != nil {
setCardHealthCodeErrors.Add(fJob.FaultTasks[index].TaskName, err)
}
}
setCardHealthCodeErrors.Print()
}
func (reScheduler ReScheduler) getTaskHealthState(fTask *FaultTask, task *api.TaskInfo,
subHealthyStrategy string) (bool, string) {
klog.V(util.LogDebugLev).Infof("task %s getTaskHealthState", fTask.TaskName)
if fTask.NodeName == "" {
return false, NodeHealthy
}
if isFault, state := reScheduler.getTaskHealthStateByNode(fTask); isFault {
return isFault, state
}
if isFault := reScheduler.getTaskHealthStateByNodeDpu(fTask); isFault {
return isFault, util.DpuFault
}
if isFault, state := reScheduler.getTaskHealthStateByPod(task); isFault && fTask.IsFaultRetryEnable {
return isFault, state
}
if fTask.RelationFault == util.SeparateFaultStrategy {
return true, util.RelationFault
}
return fTask.getTaskHealthStateBySubHealth(subHealthyStrategy)
}
func (reScheduler *ReScheduler) getTaskHealthStateByNode(fTask *FaultTask) (bool, string) {
nodeUseCardHealthState := make([]string, 0)
realFaultNode := reScheduler.GetRealFaultNodes()
for _, fNode := range realFaultNode {
if fNode.NodeName == fTask.NodeName {
if !fNode.IsFaultNode {
klog.V(util.LogInfoLev).Infof("task %s use healthy node %s, thus task sets %s", fTask.TaskName,
fNode.NodeName, NodeHealthy)
return false, NodeHealthy
}
if fNode.NodeHealthState == NodeUnhealthy {
klog.V(util.LogInfoLev).Infof("task %s use %s node %s, thus task sets %s", fTask.TaskName,
NodeUnhealthy, fNode.NodeName, NodeUnhealthy)
return true, NodeUnhealthy
}
nodeUseCardHealthState = fTask.getTaskUseFaultCardHealthState(fNode)
}
}
if util.IsSliceContain(NodeCardUnhealthy, nodeUseCardHealthState) {
klog.V(util.LogInfoLev).Infof("task %s use %s node, thus task sets %s", fTask.TaskName,
NodeCardUnhealthy, NodeCardUnhealthy)
return true, NodeCardUnhealthy
}
if _, ok := reScheduler.Nodes[fTask.NodeName]; !ok && !*reScheduler.isFirstSession {
klog.V(util.LogErrorLev).Infof("task %s use node(%s) which is not ready thus task sets %s", fTask.TaskName,
fTask.NodeName, NodeUnhealthy)
return true, NodeUnhealthy
}
klog.V(util.LogDebugLev).Infof("task %s all nodes healthy, thus task sets %s", fTask.TaskName, NodeHealthy)
return false, NodeHealthy
}
func (reScheduler *ReScheduler) getTaskHealthStateByPod(task *api.TaskInfo) (bool, string) {
if isFailedTask(task) {
return true, PodFailed
}
return false, PodHealthy
}
func (reScheduler ReScheduler) getJobsToBeRestarted(realFaultJobs map[api.JobID]*FaultJob) map[api.JobID]*FaultJob {
restartFaultJobs := make(map[api.JobID]*FaultJob)
for _, fJob := range realFaultJobs {
job, exist := reScheduler.Jobs[fJob.JobUID]
if !exist {
klog.V(util.LogDebugLev).Infof(
"rescheduling: skip job <%s/%s> because the job is not found in rescheduler cache",
fJob.JobNamespace, fJob.JobName)
continue
}
fJob.TpBlock = job.TpBlock
if fJob.TpBlock == inValidTpBlock {
fJob.TpBlock = forceRackAffinityLimit
}
if fJob.DeleteExecutedFlag {
klog.V(util.LogDebugLev).Infof(
"rescheduling: skip job <%s/%s> because the DeleteExecutedFlag for job is true",
fJob.JobNamespace, fJob.JobName)
continue
}
klog.V(util.LogDebugLev).Infof(
"rescheduling: job <%s/%s> will be rescheduled", fJob.JobNamespace, fJob.JobName)
restartFaultJobs[fJob.JobUID] = fJob
}
return restartFaultJobs
}