package jobv2
import (
"context"
"sync"
"time"
"golang.org/x/time/rate"
"k8s.io/api/core/v1"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"ascend-common/common-utils/hwlog"
"clusterd/pkg/application/statistics"
"clusterd/pkg/common/constant"
"clusterd/pkg/domain/job"
"clusterd/pkg/domain/pod"
"clusterd/pkg/domain/podgroup"
)
const (
queueOperatorAdd = "add"
queueOperatorUpdate = "update"
queueOperatorPreDelete = "preDelete"
queueOperatorDelete = "delete"
)
var uniqueQueue sync.Map
var limiter *rate.Limiter
const (
limit = 5
burst = 20
messageNumThreshold = 5
)
func init() {
limiter = rate.NewLimiter(limit, burst)
}
func Checker(ctx context.Context) {
hourTimer := time.NewTicker(time.Hour)
defer hourTimer.Stop()
minuteTimer := time.NewTicker(time.Minute)
defer minuteTimer.Stop()
for {
select {
case <-ctx.Done():
return
case <-hourTimer.C:
if !checkQueueBlock() {
addUpdateMessageIfOutdated()
}
case <-minuteTimer.C:
preDeleteToDelete()
}
}
}
func preDeleteToDelete() {
deleteKeys := job.GetShouldDeleteJobKey()
if len(deleteKeys) == 0 {
return
}
for _, jobKey := range deleteKeys {
uniqueQueue.Store(jobKey, queueOperatorDelete)
}
}
func addUpdateMessageIfOutdated() {
allKeys := job.GetShouldUpdateJobKey()
if len(allKeys) == 0 {
return
}
for _, jobKey := range allKeys {
job.FlushLastUpdateTime(jobKey)
uniqueQueue.Store(jobKey, queueOperatorUpdate)
}
}
func checkQueueBlock() bool {
messageLength := 0
uniqueQueue.Range(func(key, value interface{}) bool {
messageLength++
return true
})
if messageLength > messageNumThreshold {
hwlog.RunLog.Errorf("queue blocking. more than %d pending messages, current: %d",
messageNumThreshold, messageLength)
return true
}
return false
}
func Handler(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
jobUniqueKey := ""
operator := ""
uniqueQueue.Range(func(key, value interface{}) bool {
ok := false
jobUniqueKey, ok = key.(string)
if !ok {
return true
}
operator, ok = value.(string)
if !ok {
return true
}
return false
})
if operator == "" {
time.Sleep(time.Second)
break
}
err := limiter.Wait(ctx)
if err != nil {
hwlog.RunLog.Errorf("limiter wait failed, err: %v", err)
return
}
uniqueQueue.Delete(jobUniqueKey)
switch operator {
case queueOperatorAdd:
addJob(jobUniqueKey)
jobStcMessage(jobUniqueKey, constant.PGAdd)
case queueOperatorUpdate:
updateJob(jobUniqueKey)
jobStcMessage(jobUniqueKey, constant.PGUpdate)
case queueOperatorPreDelete:
preDeleteJob(jobUniqueKey)
jobStcMessage(jobUniqueKey, constant.PGDelete)
case queueOperatorDelete:
jobStcMessage(jobUniqueKey, constant.JobInfoDelete)
deleteJob(jobUniqueKey)
default:
hwlog.RunLog.Errorf("error operator: %s", operator)
}
}
}
}
func jobStcMessage(jobKey string, operator string) {
notifyMsg := constant.JobNotifyMsg{Operator: operator, JobKey: jobKey}
statistics.GlobalJobCollectMgr.JobNotify <- notifyMsg
}
func podGroupMessage(newPGInfo *v1beta1.PodGroup, operator string) {
switch operator {
case constant.AddOperator:
uniqueQueue.Store(podgroup.GetJobKeyByPG(newPGInfo), queueOperatorAdd)
case constant.DeleteOperator:
uniqueQueue.Store(podgroup.GetJobKeyByPG(newPGInfo), queueOperatorPreDelete)
case constant.UpdateOperator:
uniqueQueue.Store(podgroup.GetJobKeyByPG(newPGInfo), queueOperatorUpdate)
default:
hwlog.RunLog.Errorf("abnormal informer operator: %s", operator)
}
}
func podMessage(oldPodInfo, newPodInfo *v1.Pod, operator string) {
uniqueQueue.Store(pod.GetJobKeyByPod(newPodInfo), queueOperatorUpdate)
}