Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulingexception
import (
"context"
"fmt"
"strings"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
batchv1 "ascend-common/api/ascend-operator/apis/batch/v1"
"ascend-common/common-utils/hwlog"
"clusterd/pkg/common/constant"
"clusterd/pkg/common/util"
"clusterd/pkg/domain/job"
"clusterd/pkg/domain/pod"
"clusterd/pkg/interface/kube"
)
var (
collector *Collector
newCollector sync.Once
)
type Config struct {
CheckInterval int64
}
func CheckSchedulingException(ctx context.Context, config *Config) {
newCollector.Do(func() {
if config == nil {
hwlog.RunLog.Error("scheduling exception collector config is nil")
return
}
if config.CheckInterval <= 0 {
hwlog.RunLog.Error("scheduling exception collector check interval is invalid")
config.CheckInterval = defaultCheckInterval
}
collector = &Collector{
jobExceptions: map[string]*jobExceptionInfo{},
checkInterval: config.CheckInterval,
}
})
if collector == nil {
hwlog.RunLog.Error("scheduling exception collector is nil")
return
}
go collector.Start(ctx)
}
type Collector struct {
isRunning bool
checkInterval int64
jobExceptions map[string]*jobExceptionInfo
}
type exceptionReport struct {
JobExceptions map[string]*jobExceptionInfo `json:"jobs"`
}
func (c *Collector) Start(ctx context.Context) {
if c.isRunning {
hwlog.RunLog.Info("scheduling exception collector is already running")
return
}
c.isRunning = true
defer func() {
c.isRunning = false
}()
ticker := time.NewTicker(time.Duration(c.checkInterval) * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
hwlog.RunLog.Info("scheduling exception collector context done")
return
case <-ticker.C:
c.checkJobs()
}
}
}
func (c *Collector) checkJobs() {
allJobs := job.GetAllJobCache()
exceptionJobs := make(map[string]*jobExceptionInfo, 0)
for jobKey, jobInfo := range allJobs {
cond := c.processPodGroup(jobKey, jobInfo)
if cond == nil {
continue
}
if _, exist := c.jobExceptions[jobKey]; !exist {
hwlog.RunLog.Infof("job %s has scheduling exception", jobKey)
c.jobExceptions[jobKey] = &jobExceptionInfo{
JobName: jobInfo.Name,
JobType: jobInfo.JobType,
NameSpace: jobInfo.NameSpace,
Condition: *cond,
}
}
hwlog.RunLog.Infof("pgname: %v, cond: %v, pre cond: %v", jobInfo.PgName, cond, c.jobExceptions[jobKey].Condition)
key := jobInfo.Name + "." + jobKey
if cond.Equal(c.jobExceptions[jobKey].Condition) {
exceptionJobs[key] = c.jobExceptions[jobKey]
}
c.jobExceptions[jobKey].Condition = *cond
}
allMetaObjs := c.processJobs(exceptionJobs, allJobs)
c.cleanupJobs(allJobs, allMetaObjs)
report := exceptionReport{
JobExceptions: exceptionJobs,
}
if err := updateConfigMap(report); err != nil {
hwlog.RunLog.Errorf("update scheduling exception configmap failed: %v", err)
return
}
hwlog.RunLog.Infof("updated scheduling exception configmap with %d jobs", len(exceptionJobs))
}
func getPgFromCache(namespace, name string) *v1beta1.PodGroup {
obj, err := kube.GetObject(kube.PodGroupGVK(), fmt.Sprintf("%s/%s", namespace, name))
if err != nil {
hwlog.RunLog.Warnf("get podgroup %s/%s failed: %v", namespace, name, err)
return nil
}
pg, ok := obj.(*v1beta1.PodGroup)
if !ok {
hwlog.RunLog.Warnf("convert object to PodGroup failed for job %s/%s", namespace, name)
return nil
}
return pg
}
func (c *Collector) processPodGroup(jobKey string, jobInfo constant.JobInfo) *conditionDetail {
if jobInfo.Status == job.StatusJobRunning || jobInfo.Status == job.StatusJobCompleted {
delete(c.jobExceptions, jobKey)
return nil
}
pods := pod.GetPodByJobId(jobKey)
pg := getPgFromCache(jobInfo.NameSpace, jobInfo.PgName)
if pg == nil {
return c.processPodGroupUnknown(pods)
}
indices := c.analyzePodGroupConditions(pg)
cond := c.analyzePodGroupPhase(pg, pods, indices, jobInfo)
return cond
}
func (c *Collector) analyzePodGroupConditions(pg *v1beta1.PodGroup) conditionIndices {
indices := conditionIndices{
jobEnqueueFailedIndex: invalidIndex,
jobValidFailedIndex: invalidIndex,
predicatedNodesErrorIndex: invalidIndex,
batchOrderFailedIndex: invalidIndex,
notEnoughResourcesIndex: invalidIndex,
}
for index, condition := range pg.Status.Conditions {
if condition.Type != v1beta1.PodGroupUnschedulableType || condition.Status != corev1.ConditionTrue {
continue
}
switch condition.Reason {
case jobEnqueueFailedReason:
indices.jobEnqueueFailedIndex = index
case jobValidateFailedReason:
indices.jobValidFailedIndex = index
case nodePredicateFailedReason:
indices.predicatedNodesErrorIndex = index
case batchOrderFailedReason:
indices.batchOrderFailedIndex = index
case notEnoughResourcesReason:
indices.notEnoughResourcesIndex = index
default:
hwlog.RunLog.Warnf("unknown condition reason: %s", condition.Reason)
}
}
hwlog.RunLog.Infof("pg phase: %s, conditionIndices %#v", pg.Status.Phase, indices)
return indices
}
func (c *Collector) analyzePodGroupPhase(pg *v1beta1.PodGroup, pods map[string]corev1.Pod, indices conditionIndices,
jobInfo constant.JobInfo) *conditionDetail {
switch pg.Status.Phase {
case "":
return c.processPodGroupCreated()
case v1beta1.PodGroupUnknown:
return c.processPodGroupUnknown(pods)
case v1beta1.PodGroupInqueue:
return c.processPodGroupInqueue(pg, pods, indices, jobInfo)
case v1beta1.PodGroupPending:
return c.processPodGroupPending(pg, indices)
case v1beta1.PodGroupRunning:
return c.processPodGroupRunning(pods)
default:
return nil
}
}
func (c *Collector) processPodGroupCreated() *conditionDetail {
return &conditionDetail{
Status: podGroupCreated,
Reason: "PgNotInitialized",
Message: "pg phase is empty, volcano-scheduler maybe not running, you can check the status by" +
"executing command: kubectl get pod -n volcano-system -l app=volcano-scheduler",
}
}
func (c *Collector) processPodGroupUnknown(pods map[string]corev1.Pod) *conditionDetail {
for _, p := range pods {
if p.Status.Phase == corev1.PodPending {
return &conditionDetail{
Status: podGroupUnknown,
Reason: "PodPending",
Message: fmt.Sprintf("pod is pending, please check the pod status, "+
"executing command: kubectl describe pod -n %s %s", p.Namespace, p.Name),
}
}
if p.Status.Phase == corev1.PodFailed {
return c.processPodFailed(p)
}
}
return nil
}
func (c *Collector) processPodGroupInqueue(pg *v1beta1.PodGroup, pods map[string]corev1.Pod, indices conditionIndices,
jobInfo constant.JobInfo) *conditionDetail {
if indices.batchOrderFailedIndex >= 0 {
cond := &conditionDetail{
Status: podGroupInqueue,
Reason: pg.Status.Conditions[indices.batchOrderFailedIndex].Reason,
Message: pg.Status.Conditions[indices.batchOrderFailedIndex].Message,
}
if indices.predicatedNodesErrorIndex >= 0 {
cond.Message += fmt.Sprintf("; %s", pg.Status.Conditions[indices.predicatedNodesErrorIndex].Message)
}
return cond
}
if indices.predicatedNodesErrorIndex >= 0 {
return &conditionDetail{
Status: podGroupInqueue,
Reason: pg.Status.Conditions[indices.predicatedNodesErrorIndex].Reason,
Message: pg.Status.Conditions[indices.predicatedNodesErrorIndex].Message,
}
}
if indices.jobValidFailedIndex >= 0 {
return &conditionDetail{
Status: podGroupInqueue,
Reason: pg.Status.Conditions[indices.jobValidFailedIndex].Reason,
Message: pg.Status.Conditions[indices.jobValidFailedIndex].Message,
}
}
if indices.notEnoughResourcesIndex >= 0 {
return c.processNotEnoughResources(pg, pods, indices, jobInfo)
}
return nil
}
func (c *Collector) processNotEnoughResources(pg *v1beta1.PodGroup, pods map[string]corev1.Pod,
indices conditionIndices, jobInfo constant.JobInfo) *conditionDetail {
msg := pg.Status.Conditions[indices.notEnoughResourcesIndex].Message
if len(pods) < int(pg.Spec.MinMember) {
msg += fmt.Sprintf(" the number of pods is less than minMember, "+
"you can check the job status by excuting command: kubectl describe [job-type, "+
"eg vcjob,acjob] -n %s %s",
jobInfo.NameSpace, jobInfo.Name)
}
return &conditionDetail{
Status: podGroupInqueue,
Reason: pg.Status.Conditions[indices.notEnoughResourcesIndex].Reason,
Message: msg,
}
}
func (c *Collector) processPodGroupPending(pg *v1beta1.PodGroup, indices conditionIndices) *conditionDetail {
if indices.jobEnqueueFailedIndex >= 0 {
return &conditionDetail{
Status: podGroupPending,
Reason: pg.Status.Conditions[indices.jobEnqueueFailedIndex].Reason,
Message: pg.Status.Conditions[indices.jobEnqueueFailedIndex].Message,
}
}
return &conditionDetail{
Status: podGroupPending,
Reason: jobEnqueueFailedReason,
Message: fmt.Sprintf("the resources such as cpu, memory is not enough in Queue, "+
"you can check them by executing command: kubectl describe q %s, "+
"or view the log of volcano-scheduler", pg.Spec.Queue),
}
}
func (c *Collector) processPodGroupRunning(pods map[string]corev1.Pod) *conditionDetail {
for _, p := range pods {
if p.Status.Phase == corev1.PodFailed {
return c.processPodFailed(p)
}
if p.Status.Phase == corev1.PodPending {
return &conditionDetail{
Status: podGroupRunning,
Reason: "PodPending",
Message: fmt.Sprintf("pod is pending, "+
"please check the pod status using the following command: kubectl describe pod -n %s"+
" %s", p.Namespace, p.Name),
}
}
}
return nil
}
func (c *Collector) processPodFailed(p corev1.Pod) *conditionDetail {
if p.Status.Reason == "UnexpectedAdmissionError" && strings.Contains(p.Status.Message, "not get valid pod") {
return &conditionDetail{
Status: podGroupUnknown,
Reason: "PodFailed",
Message: "This pod may not be scheduled by the volcano scheduler or this pod is not valid for" +
" allocation." +
"Please check whether the annotation huawei." +
"com/Ascend910=<some-value> is present in the pod. If it is missing, " +
"inspect the Volcano scheduler logs for the keyword `Failed to get plugin volcano" +
"-npu_***" +
"` using the following command: kubectl logs -f -n volcano-system -l app=volcano" +
"-scheduler, if exist, " +
"check the config in configmap volcano-scheduler-configmap and the plugin " +
" in the dir of /plugins of the container of the volcano-scheduler",
}
}
return &conditionDetail{
Status: podGroupUnknown,
Reason: "PodFailed",
Message: fmt.Sprintf("pod is failed, "+
"please check the pod status using the following command: kubectl describe pod -n %s"+
" %s", p.Namespace, p.Name),
}
}
func (c *Collector) processJobs(exceptionJobs map[string]*jobExceptionInfo, pgInfos map[string]constant.JobInfo) map[string]metav1.Object {
if exceptionJobs == nil {
exceptionJobs = make(map[string]*jobExceptionInfo)
}
allMetaObjs := make(map[string]metav1.Object, 0)
for gvk, jobs := range kube.ListObjects(kube.AcJobGVK(), kube.VcJobGVK()) {
for _, jobObj := range jobs {
metaObj, ok := jobObj.(metav1.Object)
if !ok {
continue
}
if _, exist := pgInfos[string(metaObj.GetUID())]; exist {
continue
}
allMetaObjs[string(metaObj.GetUID())] = metaObj
hwlog.RunLog.Infof("job %s/%s is %s", metaObj.GetNamespace(), metaObj.GetName(), metaObj.GetUID())
cond := c.processJobObject(jobObj, metaObj)
if cond == nil {
continue
}
jobKey := string(metaObj.GetUID())
if _, exist := c.jobExceptions[jobKey]; !exist {
c.jobExceptions[jobKey] = &jobExceptionInfo{
NameSpace: metaObj.GetNamespace(),
JobName: metaObj.GetName(),
JobType: gvk.String(),
Condition: *cond,
}
}
key := metaObj.GetName() + "." + jobKey
if cond.Equal(c.jobExceptions[jobKey].Condition) {
exceptionJobs[key] = c.jobExceptions[jobKey]
}
c.jobExceptions[jobKey].Condition = *cond
}
}
return allMetaObjs
}
func (c *Collector) processJobObject(jobObj interface{}, metaObj metav1.Object) *conditionDetail {
var cond *conditionDetail
switch j := jobObj.(type) {
case *v1alpha1.Job:
cond = c.processVcJob(j)
case *batchv1.AscendJob:
cond = c.processAscendJob(j)
default:
hwlog.RunLog.Warn("unknown job type")
return nil
}
return cond
}
func (c *Collector) processVcJob(vcJob *v1alpha1.Job) *conditionDetail {
switch vcJob.Status.State.Phase {
case "":
return &conditionDetail{
Status: jobStatusEmpty,
Reason: "JobNoInitialized",
Message: "job condition is empty, volcano-controller maybe not running, " +
"you can check the status by executing command: kubectl get pod -n volcano-system -l" +
" app=volcano-controller",
}
case v1alpha1.Pending:
return &conditionDetail{
Status: jobStatusInitialized,
Reason: "JobPending",
Message: fmt.Sprintf("job is pending, "+
"you can check the job status by executing command: kubectl describe vcjob -n %s %s",
vcJob.GetNamespace(), vcJob.GetName()),
}
case v1alpha1.Failed:
return &conditionDetail{
Status: jobStatusFailed,
Reason: "JobFailed",
Message: fmt.Sprintf("job is failed, you can check the job status by executing command: "+
"kubectl describe vcjob -n %s %s", vcJob.GetNamespace(), vcJob.GetName()),
}
default:
return nil
}
}
func (c *Collector) processAscendJob(ascendJob *batchv1.AscendJob) *conditionDetail {
if len(ascendJob.Status.Conditions) == 0 {
return &conditionDetail{
Status: jobStatusEmpty,
Reason: "JobNoInitialized",
Message: "job condition is empty, ascend-operator maybe not running, " +
"you can check the status by executing command: kubectl get pod -n mindx-dl -l app" +
"=controller-manager",
}
}
failedConditionIndex, createdConditionIndex := invalidIndex, invalidIndex
for index, condition := range ascendJob.Status.Conditions {
if condition.Type == "Failed" && condition.Status == corev1.ConditionTrue {
failedConditionIndex = index
}
if condition.Type == "Created" && condition.Status == corev1.ConditionTrue {
createdConditionIndex = index
}
}
if failedConditionIndex >= 0 {
return &conditionDetail{
Status: jobStatusFailed,
Reason: ascendJob.Status.Conditions[failedConditionIndex].Reason,
Message: ascendJob.Status.Conditions[failedConditionIndex].Message,
}
}
if createdConditionIndex >= 0 {
return &conditionDetail{
Status: jobStatusInitialized,
Reason: ascendJob.Status.Conditions[createdConditionIndex].Reason,
Message: ascendJob.Status.Conditions[createdConditionIndex].Message,
}
}
return nil
}
func (c *Collector) cleanupJobs(allJobs map[string]constant.JobInfo, allMetaObjs map[string]metav1.Object) {
for key := range c.jobExceptions {
_, existInJobSummary := allJobs[key]
_, existInMetaObjs := allMetaObjs[key]
if !existInJobSummary && !existInMetaObjs {
hwlog.RunLog.Infof("job %s is not exist, delete from cache", key)
delete(c.jobExceptions, key)
}
}
}
func updateConfigMap(report exceptionReport) error {
data := make(map[string]string)
for key, jobInfo := range report.JobExceptions {
newJobInfo := *jobInfo
newJobInfo.Condition.Message = strings.Replace(newJobInfo.Condition.Message, "<", " ", -1)
newJobInfo.Condition.Message = strings.Replace(newJobInfo.Condition.Message, ">", "", -1)
data[key] = util.ObjToString(newJobInfo)
}
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: cmName,
Namespace: cmNamespace,
},
Data: data,
}
_, err := kube.CreateConfigMap(cm)
if err == nil {
return nil
}
if !errors.IsAlreadyExists(err) {
return fmt.Errorf("create configmap failed: %v", err)
}
existingCM, err := kube.GetConfigMap(cmName, cmNamespace)
if err != nil {
return fmt.Errorf("get configmap failed: %v", err)
}
existingCM.Data = cm.Data
_, err = kube.UpdateConfigMap(existingCM)
if err != nil {
return fmt.Errorf("update configmap failed: %v", err)
}
return nil
}