package job
import (
"encoding/json"
"strings"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"ascend-common/api"
"ascend-common/common-utils/hwlog"
"clusterd/pkg/common/constant"
"clusterd/pkg/domain/custom"
"clusterd/pkg/domain/pod"
"clusterd/pkg/domain/podgroup"
)
const (
cmDataInitLength = 16
safeDeviceSize = 1000
vcJobKind = "Job"
masterAddr = "MASTER_ADDR"
)
const (
StatusJobRunning = "running"
StatusJobPending = "pending"
StatusJobFail = "failed"
StatusJobCompleted = "complete"
)
const (
StatusRankTableInit = "initializing"
StatusRankTableComplete = "complete"
CustomJobID = "custom-job-id"
)
func PreDeleteCmAndCache(jobKey string) {
jobInfo, ok := GetJobCache(jobKey)
if !ok {
return
}
if jobInfo.AddTime == 0 {
jobInfo.AddTime = time.Now().Unix()
}
jobInfo.IsPreDelete = true
if jobInfo.Status != StatusJobCompleted {
jobInfo.Status = StatusJobFail
}
jobInfo.DeleteTime = time.Now().Unix()
jobInfo.LastUpdatedCmTime = time.Now().Unix()
hccls := getHcclSlice(jobInfo.JobRankTable)
if preDeleteCM(jobInfo, hccls) {
hwlog.RunLog.Debugf("pre delete job:%s success", jobInfo.Name)
SaveJobCache(jobKey, jobInfo)
}
}
func DeleteCmAndCache(jobKey string) {
jobInfo, ok := GetJobCache(jobKey)
if !ok {
return
}
jobInfos := GetJobByNameSpaceAndNameAndPreDelete(jobInfo.Name, jobInfo.NameSpace, false)
if len(jobInfos) > 0 {
hwlog.RunLog.Infof("job(%s) with same name, only delete local cache", jobInfo.Name)
DeleteJobCache(jobKey)
} else if deleteCm(jobInfo) {
hwlog.RunLog.Debugf("delete job:%s success", jobInfo.Name)
DeleteJobCache(jobKey)
}
}
func InitCmAndCache(podGroup v1beta1.PodGroup, podsInJob map[string]v1.Pod) {
if len(podGroup.Name) == 0 || len(podGroup.GetOwnerReferences()) == 0 {
hwlog.RunLog.Error("podGroup is nil, init configmap failed")
return
}
jobInfo := getJobBasicInfoByPG(podGroup, podsInJob)
jobInfo.Status = StatusJobPending
jobInfo.IsPreDelete = false
jobInfo.JobRankTable = constant.RankTable{}
jobInfo.LastUpdatedCmTime = time.Now().Unix()
if initCM(jobInfo) {
hwlog.RunLog.Debugf("init job:%s success", jobInfo.Name)
SaveJobCache(jobInfo.Key, jobInfo)
}
}
func getSidFromLabels(labels map[string]string) string {
if labels == nil {
return ""
}
customJobKeyVal, hasCustomJobKey := labels[constant.CustomJobKeyLabel]
if hasCustomJobKey && strings.TrimSpace(customJobKeyVal) != "" {
secondLevelVal, hasSecondLevelVal := labels[customJobKeyVal]
if hasSecondLevelVal && strings.TrimSpace(secondLevelVal) != "" {
return strings.TrimSpace(secondLevelVal)
}
}
customJobIdVal, hasCustomJobId := labels[constant.CustomJobIdLabel]
if hasCustomJobId && strings.TrimSpace(customJobIdVal) != "" {
return strings.TrimSpace(customJobIdVal)
}
return ""
}
func getSidForJobInfo(pgInfo v1beta1.PodGroup, podsInJob map[string]v1.Pod) string {
if sid := getSidFromLabels(pgInfo.Labels); sid != "" {
return sid
}
for _, p := range podsInJob {
if sid := getSidFromLabels(p.Labels); sid != "" {
return sid
}
}
sid := podgroup.GetJobKeyByPG(&pgInfo)
hwlog.RunLog.Debugf("no sid found in labels, use jobId:%s", sid)
return sid
}
func getJobBasicInfoByPG(pgInfo v1beta1.PodGroup, podsInJob map[string]v1.Pod) constant.JobInfo {
var jobInfo constant.JobInfo
key, name := podgroup.GetJobKeyAndNameByPG(&pgInfo)
jobInfo.Key = key
jobInfo.Name = name
jobInfo.PgName = pgInfo.Name
jobInfo.Replicas = max(int(pgInfo.Spec.MinMember), pod.GetMinMember(podsInJob))
jobInfo.TotalCmNum = (jobInfo.Replicas-1)/safeDeviceSize + 1
jobInfo.JobType = podgroup.GetJobTypeByPG(&pgInfo)
jobInfo.NameSpace = pgInfo.Namespace
jobInfo.Framework = podgroup.GetModelFramework(&pgInfo)
jobInfo.ResourceType = podgroup.GetResourceType(&pgInfo)
jobInfo.CustomJobID = pgInfo.Annotations[CustomJobID]
jobInfo.MultiInstanceJobId = pgInfo.Labels[constant.MindIeJobIdLabelKey]
jobInfo.AppType = pgInfo.Labels[constant.MindIeAppTypeLabelKey]
jobInfo.AddTime = time.Now().Unix()
jobInfo.Sid = getSidForJobInfo(pgInfo, podsInJob)
return jobInfo
}
func UpdateCmAndCache(status string, jobKey string, podGroup v1beta1.PodGroup,
podsInJob map[string]v1.Pod) {
jobInfo, ok := GetJobCacheDeepCopy(jobKey)
if !ok || jobInfo.Name == "" {
jobInfo = getJobBasicInfoByPG(podGroup, podsInJob)
}
if jobInfo.AddTime == 0 {
jobInfo.AddTime = time.Now().Unix()
}
jobInfo.Status = status
jobInfo.IsPreDelete = false
var completedPodNum int
jobInfo.JobRankTable, completedPodNum = pod.ConstructRankTableByPod(podsInJob, jobInfo.Replicas)
if jobInfo.Framework == "" {
jobInfo.Framework = pod.GetModelFramework(podsInJob)
}
jobInfo.LastUpdatedCmTime = time.Now().Unix()
if completedPodNum == jobInfo.Replicas {
jobInfo.JobRankTable.Status = StatusRankTableComplete
jobInfo.PreServerList = jobInfo.JobRankTable.ServerList
updateUseNodeNames(&jobInfo, podsInJob)
initJobShareTorInfo(&jobInfo, podsInJob)
} else {
jobInfo.JobRankTable.Status = StatusRankTableInit
}
jobInfo.JobRankTable.Total = jobInfo.TotalCmNum
hccls := getHcclSlice(jobInfo.JobRankTable)
result := true
for i := 0; i < jobInfo.TotalCmNum; i++ {
hccl := ""
if i < len(hccls) {
hccl = hccls[i]
}
result = updateCM(jobInfo, i, hccl) && result
}
if result {
hwlog.RunLog.Debugf("update job:%s success", jobInfo.Name)
SaveJobCache(jobInfo.Key, jobInfo)
}
}
func updateUseNodeNames(jobInfo *constant.JobInfo, podsInJob map[string]v1.Pod) {
if jobInfo.NodeNames == nil {
jobInfo.NodeNames = make(map[string]string)
}
newNodeNames := make(map[string]string, len(jobInfo.NodeNames))
for podUid, nodeName := range jobInfo.NodeNames {
newNodeNames[podUid] = nodeName
}
for _, podTemp := range podsInJob {
newNodeNames[string(podTemp.UID)] = podTemp.Spec.NodeName
}
jobInfo.NodeNames = newNodeNames
}
func initJobShareTorInfo(jobInfo *constant.JobInfo, podsInJob map[string]v1.Pod) {
if jobInfo.Framework != ptFramework {
return
}
if jobInfo.MasterAddr != "" || jobInfo.SharedTorIp != "" {
return
}
jobInfo.SharedTorIp = pod.GetSharedTorIpByPod(podsInJob)
if jobInfo.JobType == vcJobKind {
if len(jobInfo.JobRankTable.ServerList) > 0 {
jobInfo.MasterAddr = jobInfo.JobRankTable.ServerList[0].HostIp
}
} else {
jobInfo.MasterAddr = pod.GetEnvByPod(podsInJob, masterAddr)
}
}
func getHcclSlice(table constant.RankTable) []string {
if len(table.ServerList) == 0 {
return nil
}
hcclJsons := make([]string, 0, table.Total)
serverHcclSlice := make([][]constant.ServerHccl, 0, table.Total)
for i := 0; i < len(table.ServerList); i += safeDeviceSize {
if i+safeDeviceSize > len(table.ServerList) {
serverHcclSlice = append(serverHcclSlice, table.ServerList[i:])
} else {
serverHcclSlice = append(serverHcclSlice, table.ServerList[i:i+safeDeviceSize])
}
}
for i, serverHccl := range serverHcclSlice {
table.ServerList = serverHccl
str, err := json.Marshal(table)
if err != nil {
hwlog.RunLog.Errorf("Marshal hccl json part %v error, error is %v", i, err)
continue
}
hcclJsons = append(hcclJsons, string(str))
}
return hcclJsons
}
func GetJobServerInfoMap() constant.JobServerInfoMap {
allJobServerMap := make(map[string]map[string]constant.ServerHccl)
allRetryJobFlag := make(map[string]bool)
resourceType := make(map[string]string)
for jobKey, jobInfo := range GetAllJobCache() {
jobServerMap := buildJobServerInfoMap(jobInfo)
allJobServerMap[jobKey] = jobServerMap
allRetryJobFlag[jobKey] = podgroup.JudgeRetryByJobKey(jobKey)
resourceType[jobKey] = jobInfo.ResourceType
}
return constant.JobServerInfoMap{InfoMap: allJobServerMap,
RetryTolerate: allRetryJobFlag, ResourceType: resourceType}
}
func buildJobServerInfoMap(jobInfo constant.JobInfo) map[string]constant.ServerHccl {
jobServerMap := make(map[string]constant.ServerHccl)
for _, server := range jobInfo.PreServerList {
copyServerHccl := constant.ServerHccl{
DeviceList: make([]constant.Device, 0),
ServerID: server.ServerID,
HostIp: server.HostIp,
PodID: server.PodID,
PodNameSpace: server.PodNameSpace,
ServerName: server.ServerName,
ServerSN: server.ServerSN,
PodName: server.PodName,
ContainerIds: server.ContainerIds,
}
for _, dev := range server.DeviceList {
copyDev := constant.Device{
DeviceID: dev.DeviceID,
DeviceIP: dev.DeviceIP,
RankID: dev.RankID,
}
copyServerHccl.DeviceList = append(copyServerHccl.DeviceList, copyDev)
}
jobServerMap[server.ServerName] = copyServerHccl
}
return jobServerMap
}
func GetJobIsRunning(jobKey string) bool {
jobCache, _ := GetJobCache(jobKey)
return jobCache.Status == StatusJobRunning
}
func GetJobIsExists(jobKey string) bool {
_, ok := GetJobCache(jobKey)
return ok
}
func FlushLastUpdateTime(jobKey string) {
jobInfo, ok := GetJobCache(jobKey)
if !ok {
return
}
jobInfo.LastUpdatedCmTime = time.Now().Unix()
SaveJobCache(jobKey, jobInfo)
}
func IsMindIeServerPod(podInfo v1.Pod) bool {
return podInfo.Labels != nil && podInfo.Labels[constant.MindIeJobIdLabelKey] != "" &&
podInfo.Labels[constant.MindIeAppTypeLabelKey] == constant.ServerAppType
}
func IsMindIeServerJob(jobInfo *constant.JobInfo) bool {
return jobInfo != nil && jobInfo.MultiInstanceJobId != "" && jobInfo.AppType == constant.ServerAppType
}
func GetCustomFilterFaultJobAndUsedDeviceInfoMap() (map[string]map[string]constant.JobInfo,
map[string]map[string]sets.String) {
jobInfoMap := make(map[string]map[string]constant.JobInfo)
deviceInfoMap := make(map[string]map[string]sets.String)
allJob := GetAllJobCache()
for jobKey, jobInfo := range allJob {
podsInJob := pod.GetPodByJobId(jobKey)
if len(podsInJob) == 0 {
continue
}
if !custom.JudgeFilterFaultAnnosByJobKey(jobKey) {
continue
}
jobUsedDevices := sets.String{}
for _, podInfo := range podsInJob {
nodeName := podInfo.Spec.NodeName
if _, exists := jobInfoMap[nodeName]; !exists {
jobInfoMap[nodeName] = make(map[string]constant.JobInfo)
deviceInfoMap[nodeName] = make(map[string]sets.String)
}
if _, exists := jobInfoMap[nodeName][jobKey]; !exists {
jobInfoMap[nodeName][jobKey] = jobInfo
}
if realDevice, exist := podInfo.Annotations[api.PodAnnotationAscendReal]; exist && realDevice != "" {
jobUsedDevices = jobUsedDevices.Insert(strings.Split(realDevice, constant.Comma)...)
}
deviceInfoMap[nodeName][jobKey] = jobUsedDevices
}
}
return jobInfoMap, deviceInfoMap
}
func DeepCopyServerHcclSlice(serverList []constant.ServerHccl) []constant.ServerHccl {
if len(serverList) == 0 {
return []constant.ServerHccl{}
}
serverListCopy := make([]constant.ServerHccl, 0, len(serverList))
for _, server := range serverList {
serverCopy := server
if server.DeviceList != nil {
serverCopy.DeviceList = make([]constant.Device, len(server.DeviceList))
copy(serverCopy.DeviceList, server.DeviceList)
}
serverListCopy = append(serverListCopy, serverCopy)
}
return serverListCopy
}
func DeepCopyJobInfo(job *constant.JobInfo) *constant.JobInfo {
copyJob := *job
if job.NodeNames != nil {
copyJob.NodeNames = make(map[string]string, len(job.NodeNames))
for k, v := range job.NodeNames {
copyJob.NodeNames[k] = v
}
}
copyJob.PreServerList = DeepCopyServerHcclSlice(job.PreServerList)
copyJob.JobRankTable.ServerList = DeepCopyServerHcclSlice(job.JobRankTable.ServerList)
return ©Job
}