Copyright(C) 2023. Huawei Technologies Co.,Ltd. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
Package controllers is using for reconcile AscendJob.
*/
package v1
import (
"strconv"
"strings"
"sync"
"time"
commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
corev1 "k8s.io/api/core/v1"
"ascend-common/api"
"ascend-common/common-utils/hwlog"
mindxdlv1 "ascend-operator/pkg/api/v1"
)
func getContainerExitCode(pod *corev1.Pod) int32 {
var exitCode int32 = 0xbeef
for _, status := range pod.Status.ContainerStatuses {
state := status.State
if status.Name == api.DefaultContainerName && state.Terminated != nil {
exitCode = state.Terminated.ExitCode
}
}
return exitCode
}
func initializeReplicaStatuses(jobStatus *commonv1.JobStatus, rtype commonv1.ReplicaType) {
if jobStatus.ReplicaStatuses == nil {
jobStatus.ReplicaStatuses = make(map[commonv1.ReplicaType]*commonv1.ReplicaStatus)
}
jobStatus.ReplicaStatuses[rtype] = &commonv1.ReplicaStatus{}
}
func updateJobReplicaStatuses(jobStatus *commonv1.JobStatus, rtype commonv1.ReplicaType, pod *corev1.Pod) {
hwlog.RunLog.Debugf("before updateJobReplicaStatuses status<%#v> by pod<%s> phase<%s>",
jobStatus.ReplicaStatuses[rtype], pod.Name, pod.Status.Phase)
defer hwlog.RunLog.Debugf("after updateJobReplicaStatuses status<%#v>", jobStatus.ReplicaStatuses[rtype])
switch pod.Status.Phase {
case corev1.PodRunning:
jobStatus.ReplicaStatuses[rtype].Active++
case corev1.PodSucceeded:
jobStatus.ReplicaStatuses[rtype].Succeeded++
case corev1.PodFailed:
if pod.DeletionTimestamp != nil {
hwlog.RunLog.Infof("pod<%s> is deleting, so it can not be treat as failed", pod.Name)
return
}
jobStatus.ReplicaStatuses[rtype].Failed++
default:
}
}
func ContainsChiefOrMasterSpec(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) bool {
if _, ok := replicas[mindxdlv1.TensorflowReplicaTypeChief]; ok {
return true
}
if _, ok := replicas[mindxdlv1.PytorchReplicaTypeMaster]; ok {
return true
}
return false
}
func getContainerResourceReq(ct corev1.Container) int {
for rName, rNum := range ct.Resources.Requests {
if strings.Contains(string(rName), api.ResourceNamePrefix) {
return int(rNum.Value())
}
}
return 0
}
func getContainerNPUResourceNameAndReq(ct corev1.Container) (string, int) {
for rName, rNum := range ct.Resources.Requests {
if strings.Contains(string(rName), api.ResourceNamePrefix) {
return string(rName), int(rNum.Value())
}
}
return "", 0
}
func getNpuReqInfoPerPod(job *mindxdlv1.AscendJob) (string, int) {
npuWorker := getNpuWorkerSpec(job)
if npuWorker == nil {
return "", 0
}
for _, ct := range npuWorker.Template.Spec.Containers {
if ct.Name == api.DefaultContainerName {
return getContainerNPUResourceNameAndReq(ct)
}
}
return "", 0
}
func getNpuWorkerSpec(job *mindxdlv1.AscendJob) *commonv1.ReplicaSpec {
status := getNonWorkerPodMountChipStatus(job)
for rtype, spec := range job.Spec.ReplicaSpecs {
if status {
return spec
}
if rtype == mindxdlv1.ReplicaTypeWorker {
return spec
}
}
return nil
}
func localRankStr(req int) string {
if req == 0 {
return ""
}
rankStr := ""
for i := 0; i < req-1; i++ {
rankStr += strconv.Itoa(i) + ","
}
rankStr += strconv.Itoa(req - 1)
return rankStr
}
func getTotalNpuReplicas(job *mindxdlv1.AscendJob) int {
jobReplicas := int32(0)
status := getNonWorkerPodMountChipStatus(job)
for rtype, spec := range job.Spec.ReplicaSpecs {
if !status && rtype != mindxdlv1.ReplicaTypeWorker {
continue
}
jobReplicas += *spec.Replicas
}
return int(jobReplicas)
}
func getTotalReplicas(job *mindxdlv1.AscendJob) int32 {
jobReplicas := int32(0)
for _, spec := range job.Spec.ReplicaSpecs {
jobReplicas += *spec.Replicas
}
return jobReplicas
}
func getRestartCondition(conds []commonv1.JobCondition) *commonv1.JobCondition {
for _, condition := range conds {
if condition.Type == commonv1.JobRestarting {
return &commonv1.JobCondition{
Reason: condition.Reason,
Message: condition.Message,
}
}
}
return nil
}
func specReplicas(spec *commonv1.ReplicaSpec) int32 {
if spec.Replicas == nil {
return int32(1)
}
return *spec.Replicas
}
type specInfo struct {
name commonv1.ReplicaType
job *mindxdlv1.AscendJob
spec *commonv1.ReplicaSpec
status *commonv1.ReplicaStatus
}
type podInfo struct {
frame string
job *mindxdlv1.AscendJob
clusterdSvcIp string
status *commonv1.ReplicaStatus
rtype commonv1.ReplicaType
isDynamicCutJob bool
isSoftShareDevJob bool
index int
spec *commonv1.ReplicaSpec
isMaster bool
ip string
port string
ctReq int
npuReplicas int
rank int
backupPodName string
}
func (pi *podInfo) DeepCopy() *podInfo {
return &podInfo{
isDynamicCutJob: pi.isDynamicCutJob,
isSoftShareDevJob: pi.isSoftShareDevJob,
frame: pi.frame,
job: pi.job,
status: pi.status,
rtype: pi.rtype,
spec: pi.spec,
ip: pi.ip,
port: pi.port,
ctReq: pi.ctReq,
npuReplicas: pi.npuReplicas,
clusterdSvcIp: pi.clusterdSvcIp,
}
}
type validateError struct {
reason string
message string
}
func (ve *validateError) Error() string {
return ve.message
}
func filterPodsByReplicaType(pods []*corev1.Pod, rt string) []*corev1.Pod {
var filtered []*corev1.Pod
for _, pod := range pods {
if pod.Labels[commonv1.ReplicaTypeLabel] == rt {
filtered = append(filtered, pod)
}
}
return filtered
}
func checkNonWorkerRplMountChips(ji *jobInfo) bool {
for rtype, spec := range ji.rpls {
if rtype == mindxdlv1.ReplicaTypeWorker {
continue
}
if checkContainersResourceReq(spec.Template.Spec.Containers) {
return true
}
}
return false
}
func checkContainersResourceReq(containers []corev1.Container) bool {
for _, container := range containers {
if container.Name == api.DefaultContainerName {
rNum := getContainerResourceReq(container)
if rNum > 0 {
return true
}
}
}
return false
}
func getNonWorkerPodMountChipStatus(job *mindxdlv1.AscendJob) bool {
annotations := job.GetAnnotations()
status, ok := annotations[nonWorkerPodMountChipStatus]
if !ok {
return false
}
return status == "true"
}
func checkNpuPod(pi *podInfo) bool {
for rtype, spec := range pi.job.Spec.ReplicaSpecs {
if rtype != pi.rtype {
continue
}
return checkContainersResourceReq(spec.Template.Spec.Containers)
}
return false
}
func getJobRequiredNpu(job *mindxdlv1.AscendJob) int {
requiredNpu := 0
for _, spec := range job.Spec.ReplicaSpecs {
for _, container := range spec.Template.Spec.Containers {
requiredNpu += getContainerResourceReq(container)
}
}
return requiredNpu
}
type batchCreateManager struct {
stateMutex sync.RWMutex
lastFailureTime time.Time
failedFlag bool
}
func (bcm *batchCreateManager) tryBatchCreate() bool {
bcm.stateMutex.Lock()
defer bcm.stateMutex.Unlock()
return !bcm.failedFlag || time.Since(bcm.lastFailureTime) > defaultBatchCreateFailInterval
}
func (bcm *batchCreateManager) updateUnavailableStatus() {
bcm.stateMutex.Lock()
defer bcm.stateMutex.Unlock()
bcm.failedFlag = true
bcm.lastFailureTime = time.Now()
}
func isSoftShareDevJob(job *mindxdlv1.AscendJob) bool {
if job == nil {
return false
}
_, hasAicoreQuota := job.Labels[api.SchedulerSoftShareDevAicoreQuotaKey]
_, hasHbmQuota := job.Labels[api.SchedulerSoftShareDevHbmQuotaKey]
_, hasSchedulingPolicy := job.Labels[api.SchedulerSoftShareDevPolicyKey]
return hasAicoreQuota && hasHbmQuota && hasSchedulingPolicy
}