package jobv2
import (
"k8s.io/api/core/v1"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"ascend-common/common-utils/hwlog"
"clusterd/pkg/application/jobinfo"
"clusterd/pkg/common/constant"
"clusterd/pkg/common/util"
"clusterd/pkg/domain/job"
"clusterd/pkg/domain/pod"
"clusterd/pkg/domain/podgroup"
)
func addJob(jobKey string) {
podGroupCache := podgroup.GetPodGroup(jobKey)
if podGroupCache.Name != "" && len(pod.GetPodByJobId(jobKey)) > 0 {
uniqueQueue.Store(jobKey, queueOperatorUpdate)
return
}
oldJobInfo := job.GetJobByNameSpaceAndName(podgroup.GetJobNameByPG(&podGroupCache), podGroupCache.Namespace)
if oldJobInfo.Name != "" && oldJobInfo.IsPreDelete && oldJobInfo.Key != jobKey {
job.DeleteJobCache(oldJobInfo.Key)
}
jobInfo, ok := job.GetJobCache(jobKey)
if !ok {
job.InitCmAndCache(podGroupCache, nil)
if jobCache, ok := job.GetJobCache(jobKey); ok {
jobinfo.SendJobInfoSignal(jobinfo.BuildJobSignalFromJobInfo(jobCache, constant.DefaultHcclJson,
constant.AddOperator))
}
return
}
if jobInfo.Status == job.StatusJobPending {
return
}
uniqueQueue.Store(jobKey, queueOperatorUpdate)
}
func updateJob(jobKey string) {
pg := podgroup.GetPodGroup(jobKey)
jobInfo, ok := job.GetJobCache(jobKey)
if !ok && pg.Name == "" {
hwlog.RunLog.Debugf("job cache is empty and podGroup is empty, skip %s message", jobKey)
return
}
podsInJob := pod.GetPodByJobId(jobKey)
isPreDelete, status := getStatusByCache(pg, podsInJob, jobInfo)
podsInJob, needRebuildJobSummary := preHandlePods(podsInJob)
if !needRebuildJobSummary && ok && jobInfo.Status == status && jobInfo.IsPreDelete == isPreDelete {
hwlog.RunLog.Debugf("the job %s cache is consistent with pod and podGroup cache", jobInfo.Name)
return
}
if isPreDelete {
if !jobInfo.IsPreDelete {
hwlog.RunLog.Debugf("job %s updateJob to preDeleteJob", jobInfo.Name)
job.PreDeleteCmAndCache(jobKey)
if jobCache, ok := job.GetJobCache(jobKey); ok {
jobinfo.SendJobInfoSignal(jobinfo.BuildJobSignalFromJobInfo(jobCache,
util.ObjToString(jobCache.JobRankTable), constant.DeleteOperator))
}
}
return
}
if status == job.StatusJobPending && jobInfo.Status != job.StatusJobPending {
hwlog.RunLog.Debugf("job %s updateJob to addJob", jobInfo.Name)
job.InitCmAndCache(pg, podsInJob)
if jobCache, ok := job.GetJobCache(jobKey); ok {
jobinfo.SendJobInfoSignal(jobinfo.BuildJobSignalFromJobInfo(jobCache,
constant.DefaultHcclJson, constant.AddOperator))
}
return
}
if needRebuildJobSummary || jobInfo.Status != status {
job.UpdateCmAndCache(status, jobKey, pg, podsInJob)
if jobCache, ok := job.GetJobCache(jobKey); ok {
jobinfo.SendJobInfoSignal(jobinfo.BuildJobSignalFromJobInfo(jobCache,
util.ObjToString(jobCache.JobRankTable), constant.AddOperator))
}
return
}
hwlog.RunLog.Warnf("this logic branch is unreachable, there must have been some issues with the code."+
"isPreDelete: %v, status: %s, job name: %s, job.isPreDelete: %v, job.status: %s",
isPreDelete, status, jobInfo.Name, jobInfo.IsPreDelete, jobInfo.Status)
}
func preHandlePods(podsInJob map[string]v1.Pod) (map[string]v1.Pod, bool) {
res := make(map[string]v1.Pod)
newPodsMap := new(map[string]v1.Pod)
err := util.DeepCopy(newPodsMap, podsInJob)
if err != nil {
hwlog.RunLog.Errorf("deep copy podsInJob failed, err: %v", err)
return podsInJob, true
}
needRebuild := false
for _, p := range *newPodsMap {
if pod.IsBackupPodAfterSourcePodDeleted(p.Name) {
pod.DeleteFromBackupPodsMaps(p.Name)
needRebuild = true
} else if pod.IsNewPodForHotSwitch(&p) {
continue
}
res[string(p.UID)] = p
}
return res, needRebuild
}
func getStatusByCache(podGroup v1beta1.PodGroup, podsInJob map[string]v1.Pod, jobInfo constant.JobInfo) (bool, string) {
if podGroup.Name == "" && len(podsInJob) == 0 {
return true, getStatusByOldStatus(jobInfo)
}
if len(podsInJob) == 0 {
return false, getStatusByOldStatus(jobInfo)
}
isFailed := false
isSuccess := true
isRunning := true
for _, p := range podsInJob {
if p.Status.Phase == v1.PodFailed {
isFailed = true
isSuccess = false
isRunning = false
break
}
if p.Status.Phase != v1.PodSucceeded {
isSuccess = false
}
if (p.Status.Phase != v1.PodRunning && p.Status.Phase != v1.PodSucceeded) ||
!pod.DeviceAllocateIsCompleted(p) {
isRunning = false
}
}
if isFailed {
return false, job.StatusJobFail
}
if isSuccess && len(podsInJob) >= max(int(podGroup.Spec.MinMember), pod.GetMinMember(podsInJob)) {
return false, job.StatusJobCompleted
}
minMember := max(int(podGroup.Spec.MinMember), pod.GetMinMember(podsInJob))
if isRunning && len(podsInJob) >= minMember && minMember > 0 {
return false, job.StatusJobRunning
}
return false, getStatusByOldStatus(jobInfo)
}
func getStatusByOldStatus(jobInfo constant.JobInfo) string {
switch jobInfo.Status {
case job.StatusJobRunning:
return job.StatusJobFail
case job.StatusJobCompleted:
return job.StatusJobCompleted
case job.StatusJobFail:
return job.StatusJobFail
default:
return job.StatusJobPending
}
}
func preDeleteJob(jobKey string) {
jobInfo, ok := job.GetJobCache(jobKey)
if !ok {
return
}
if jobInfo.IsPreDelete {
return
}
podsInJob := pod.GetPodByJobId(jobKey)
if len(podsInJob) > 0 {
uniqueQueue.Store(jobKey, queueOperatorUpdate)
return
}
job.PreDeleteCmAndCache(jobKey)
if jobCache, ok := job.GetJobCache(jobKey); ok {
jobinfo.SendJobInfoSignal(jobinfo.BuildJobSignalFromJobInfo(jobCache,
util.ObjToString(jobCache.JobRankTable), constant.DeleteOperator))
}
}
func deleteJob(joKey string) {
hwlog.RunLog.Debugf("delete job %s", joKey)
job.DeleteCmAndCache(joKey)
}