Copyright(C)2020-2022. Huawei Technologies Co.,Ltd. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
Package rescheduling is using for HuaWei Ascend pin fault rescheduling.
*/
package rescheduling
import (
"encoding/json"
"errors"
"fmt"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/plugins/ascend-volcano-plugin/common/k8s"
"volcano.sh/volcano/pkg/scheduler/plugins/ascend-volcano-plugin/common/util"
"volcano.sh/volcano/pkg/scheduler/plugins/ascend-volcano-plugin/plugin"
)
func init() {
reSchedulerCache = newReSchedulerCache()
}
func newReSchedulerCache() *DealReSchedulerCache {
return &DealReSchedulerCache{
FaultNodes: map[string]*FaultNode{},
FaultJobs: map[api.JobID]*FaultJob{},
}
}
func GetReSchedulerCache() *DealReSchedulerCache {
return reSchedulerCache
}
func (reCache *DealReSchedulerCache) setFaultNodes(faultNodes map[string]*FaultNode) {
reCache.FaultNodes = faultNodes
}
func (reCache *DealReSchedulerCache) setFaultJobs(faultJobs map[api.JobID]*FaultJob) {
reCache.FaultJobs = faultJobs
}
func (reCache DealReSchedulerCache) getRecentReschedulingRecordsFromCm(buffer string) (
map[api.JobID]*RescheduleReason, error) {
rescheduleRecords := make(map[api.JobID]*RescheduleReason)
if unmarshalErr := json.Unmarshal([]byte(buffer), &rescheduleRecords); unmarshalErr != nil {
klog.V(util.LogDebugLev).Info("Unmarshal reschedule records from cache failed")
return nil, fmt.Errorf("reschedule records convert from CM error: %s", util.SafePrint(unmarshalErr))
}
return rescheduleRecords, nil
}
func (reCache *DealReSchedulerCache) SetJobRecentRescheduleRecords(firstStartup *bool,
client kubernetes.Interface) error {
if reCache == nil {
klog.V(util.LogErrorLev).Infof("SetJobRecentRescheduleRecords failed: %s", util.ArgumentError)
return errors.New(util.ArgumentError)
}
if firstStartup != nil && *firstStartup {
cm, err := k8s.GetConfigMap(client, RescheduleReasonCmNamespace, RescheduleReasonCmName)
if err != nil {
return fmt.Errorf("failed to get reschedule reason configmap, err: %s", err.Error())
}
recordedRecords, err := reCache.getRecentReschedulingRecordsFromCm(cm.Data[CmJobRescheduleReasonsKey])
if err != nil {
return fmt.Errorf("getRecentReschedulingRecordsFromCm %s", util.SafePrint(err))
}
reCache.JobRecentRescheduleRecords = recordedRecords
klog.V(util.LogDebugLev).Infof("start sync old rescheduling records %#v", recordedRecords)
return nil
}
return nil
}
func (reCache *DealReSchedulerCache) marshalCacheDataToString(data interface{}) (string, error) {
dataBuffer, err := json.Marshal(data)
if err != nil {
klog.V(util.LogErrorLev).Infof("marshalCacheDataToString err: %s", util.SafePrint(err))
return "", err
}
return string(dataBuffer), nil
}
func (reCache DealReSchedulerCache) getRealFaultJobs() map[api.JobID]*FaultJob {
realFaultJobs := make(map[api.JobID]*FaultJob)
for _, fJob := range reCache.FaultJobs {
if (!fJob.IsFaultJob && !fJob.IsNormalJobNeedRestart()) || fJob.ReScheduleKey == JobOffRescheduleLabelValue {
klog.V(util.LogDebugLev).Infof(
"rescheduling: skip job <%s/%s> because the job has no fault or rescheduling is off",
fJob.JobNamespace, fJob.JobName)
continue
}
faultReason := PodFailed
for _, faultType := range fJob.FaultTypes {
if faultType == NodeUnhealthy || faultType == NodeCardUnhealthy || faultType == SubHealthFault ||
faultType == util.RelationFault {
faultReason = faultType
break
}
}
fJob.faultReason = faultReason
realFaultJobs[fJob.JobUID] = fJob
}
return realFaultJobs
}
func (reCache DealReSchedulerCache) GetRealFaultNodes() map[string]*FaultNode {
realFaultNodes := make(map[string]*FaultNode)
for _, fNode := range reCache.FaultNodes {
if !fNode.IsFaultNode {
continue
}
realFaultNodes[fNode.NodeName] = fNode
}
return realFaultNodes
}
func (reCache *DealReSchedulerCache) writeFaultNodesToCMString() (string, error) {
realFaultNode := reCache.GetRealFaultNodes()
if len(realFaultNode) == 0 {
return "", nil
}
reCache.FaultNodes = realFaultNode
nodeDataToCm, marshalErr := reCache.marshalCacheDataToString(getFaultNodeToCm(realFaultNode))
if marshalErr != nil {
return "", fmt.Errorf("writeFaultNodesToCM: nodeDataToCm failed %s", util.SafePrint(marshalErr))
}
return nodeDataToCm, nil
}
func getFaultNodeToCm(realFaultNode map[string]*FaultNode) []FaultNodeInfoToCm {
faultNodeToCm := make([]FaultNodeInfoToCm, 0, len(realFaultNode))
for _, fNode := range realFaultNode {
faultNodeToCm = append(faultNodeToCm, initFaultNodeToCmByFaultNode(fNode))
}
return faultNodeToCm
}
func initFaultNodeToCmByFaultNode(fNode *FaultNode) FaultNodeInfoToCm {
return FaultNodeInfoToCm{
FaultDeviceList: fNode.FaultDeviceList,
NodeName: fNode.NodeName,
UnhealthyNPU: fNode.UnhealthyNPU,
NetworkUnhealthyNPU: fNode.NetworkUnhealthyNPU,
NodeDEnable: fNode.NodeDEnable,
NodeHealthState: fNode.NodeHealthState,
UpdateTime: fNode.UpdateTime,
}
}
func (reCache *DealReSchedulerCache) getRemainTimesToCMString() (string, error) {
if len(reCache.JobRemainRetryTimes) == 0 {
return "", nil
}
nodeHBsData, err := reCache.marshalCacheDataToString(reCache.JobRemainRetryTimes)
if err != nil {
return "", fmt.Errorf("getRemainTimesToCMString: %s", util.SafePrint(err))
}
return nodeHBsData, nil
}
func (reCache *DealReSchedulerCache) writeRescheduleReasonsToCMString() (string, error) {
if len(reCache.JobRecentRescheduleRecords) == 0 {
return "", nil
}
rescheduleReasonStr, err := reCache.marshalCacheDataToString(reCache.JobRecentRescheduleRecords)
if err != nil {
return "", fmt.Errorf("writeRescheduleReasonsToCMString: %s", util.SafePrint(err))
}
if len(rescheduleReasonStr) > MaxKbOfRescheduleRecords {
klog.V(util.LogWarningLev).Infof("reason of reschedule into configmap is more than %d Kb, "+
"will reduce it", MaxKbOfRescheduleRecords)
}
for i := 0; len(rescheduleReasonStr) > MaxKbOfRescheduleRecords && i < MaxRescheduleRecordsNum; i++ {
for jobId, reason := range reCache.JobRecentRescheduleRecords {
if len(reason.RescheduleRecords) <= 1 {
continue
}
lastRecord := reason.RescheduleRecords[len(reason.RescheduleRecords)-1]
reason.RescheduleRecords = reason.RescheduleRecords[:len(reason.RescheduleRecords)-1]
klog.V(util.LogWarningLev).Infof("cut job %v reschedule reason of timestamp %d from cm, "+
"to reduce records length", jobId, lastRecord.RescheduleTimeStamp)
}
rescheduleReasonStr, err = reCache.marshalCacheDataToString(reCache.JobRecentRescheduleRecords)
if err != nil {
return "", fmt.Errorf("writeRescheduleReasonsToCMString: %s", util.SafePrint(err))
}
if len(rescheduleReasonStr) <= MaxKbOfRescheduleRecords {
break
}
}
return rescheduleReasonStr, nil
}
func (reCache *DealReSchedulerCache) WriteReSchedulerCacheToEnvCache(env *plugin.ScheduleEnv, jobType string) error {
if reCache == nil || env == nil {
return errors.New(util.ArgumentError)
}
env.OutputCache.Names[RePropertyName] = CmName
env.OutputCache.Namespaces[RePropertyName] = CmNameSpace
fNodeToCMString, err := reCache.writeFaultNodesToCMString()
if err != nil {
klog.V(util.LogDebugLev).Infof("WriteReSchedulerCacheToEnvCache: %s", util.SafePrint(err))
}
reCache.FaultJobs = getRealFaultJobForCache(reCache.getRealFaultJobs())
jobRemainRetryTimes, err := reCache.getRemainTimesToCMString()
if err != nil {
klog.V(util.LogDebugLev).Infof("getRemainTimesToCMString: %s", util.SafePrint(err))
}
if err := reCache.setRescheduleReasonToCache(env); err != nil {
klog.V(util.LogDebugLev).Infof("setRescheduleReasonToCache: %s", util.SafePrint(err))
}
cmData, ok := env.OutputCache.Data[RePropertyName]
if !ok {
cmData = make(map[string]string, util.MapInitNum)
env.OutputCache.Data[RePropertyName] = cmData
}
cmData[CmJobRemainRetryTimes] = jobRemainRetryTimes
cmData[CmFaultNodeKind] = fNodeToCMString
return nil
}
func (reCache *DealReSchedulerCache) setRescheduleReasonToCache(env *plugin.ScheduleEnv) error {
env.OutputCache.Names[ReschedulingReasonKey] = RescheduleReasonCmName
env.OutputCache.Namespaces[ReschedulingReasonKey] = RescheduleReasonCmNamespace
jobRescheduleReasons, err := reCache.writeRescheduleReasonsToCMString()
if err != nil {
klog.V(util.LogDebugLev).Infof("writeRescheduleReasonsToCMString: %s", util.SafePrint(err))
return fmt.Errorf("writeRescheduleReasonsToCMString: %s", util.SafePrint(err))
}
reasonCmData, ok := env.OutputCache.Data[ReschedulingReasonKey]
if !ok {
reasonCmData = make(map[string]string, util.MapInitNum)
env.OutputCache.Data[ReschedulingReasonKey] = reasonCmData
}
reasonCmData[CmJobRescheduleReasonsKey] = jobRescheduleReasons
return nil
}
func judgePublicFaultInReason(fTask *miniFaultTask) bool {
if fTask == nil {
return false
}
for _, reason := range fTask.Reason {
if reason.FaultType == PublicFaultType {
return true
}
}
return false
}