Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package manualfault
import (
"encoding/json"
"fmt"
"sort"
"sync"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"ascend-common/api"
"ascend-common/common-utils/hwlog"
"ascend-common/common-utils/utils"
"clusterd/pkg/common/constant"
"clusterd/pkg/common/util"
"clusterd/pkg/interface/kube"
)
var LastCmInfo map[string]NodeCmInfo
var FaultCmInfo Cache
type Cache struct {
manualInfo map[string]NodeCmInfo
mutex sync.RWMutex
}
type NodeCmInfo struct {
Total []string
Detail map[string][]DevCmInfo
}
type DevCmInfo struct {
FaultCode string
FaultLevel string
LastSeparateTime int64
}
func init() {
InitFaultCmInfo()
}
func InitFaultCmInfo() {
FaultCmInfo = Cache{
manualInfo: make(map[string]NodeCmInfo),
mutex: sync.RWMutex{},
}
LastCmInfo = make(map[string]NodeCmInfo)
}
func (c *Cache) SetNodeInfo(nodeInfo map[string]NodeCmInfo) {
c.mutex.Lock()
defer c.mutex.Unlock()
if len(nodeInfo) == 0 {
c.manualInfo = make(map[string]NodeCmInfo)
return
}
c.manualInfo = nodeInfo
}
func (c *Cache) Len() int {
c.mutex.RLock()
defer c.mutex.RUnlock()
return len(c.manualInfo)
}
func (c *Cache) DeepCopy() (map[string]NodeCmInfo, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
result := new(map[string]NodeCmInfo)
if err := util.DeepCopy(result, c.manualInfo); err != nil {
return nil, err
}
return *result, nil
}
func (c *Cache) AddSeparateDev(faultInfo FaultInfo) {
devInfo := DevCmInfo{
FaultCode: faultInfo.FaultCode,
FaultLevel: constant.ManuallySeparateNPU,
LastSeparateTime: faultInfo.ReceiveTime,
}
c.mutex.Lock()
defer c.mutex.Unlock()
var addMsg = fmt.Sprintf("node: %s, dev: %s, code: %s is not found in manual fault cache, add",
faultInfo.NodeName, faultInfo.DevName, faultInfo.FaultCode)
var updateMsg = fmt.Sprintf("node: %s, dev: %s, code: %s is found in manual fault cache, update last separate time",
faultInfo.NodeName, faultInfo.DevName, faultInfo.FaultCode)
info, ok := c.manualInfo[faultInfo.NodeName]
if !ok {
hwlog.RunLog.Infof(addMsg)
c.manualInfo[faultInfo.NodeName] = NodeCmInfo{
Total: []string{faultInfo.DevName},
Detail: map[string][]DevCmInfo{faultInfo.DevName: {devInfo}},
}
return
}
if !utils.Contains(info.Total, faultInfo.DevName) {
hwlog.RunLog.Infof(addMsg)
info.Total = append(info.Total, faultInfo.DevName)
info.Detail[faultInfo.DevName] = []DevCmInfo{devInfo}
c.manualInfo[faultInfo.NodeName] = info
return
}
var found bool
for idx, fault := range info.Detail[faultInfo.DevName] {
if fault.FaultCode == faultInfo.FaultCode {
found = true
hwlog.RunLog.Infof(updateMsg)
fault.LastSeparateTime = faultInfo.ReceiveTime
info.Detail[faultInfo.DevName][idx] = fault
break
}
}
if !found {
hwlog.RunLog.Infof(addMsg)
info.Detail[faultInfo.DevName] = append(info.Detail[faultInfo.DevName], devInfo)
c.manualInfo[faultInfo.NodeName] = info
}
return
}
func (c *Cache) HasDevManualSep(nodeName, devName string) bool {
c.mutex.Lock()
defer c.mutex.Unlock()
info, ok := c.manualInfo[nodeName]
if !ok {
return false
}
return utils.Contains(info.Total, devName)
}
func (c *Cache) DeleteSeparateDev(nodeName, devId string) {
c.mutex.Lock()
defer c.mutex.Unlock()
info, ok := c.manualInfo[nodeName]
if !ok {
return
}
info.Total = utils.Remove(info.Total, devId)
if len(info.Total) == 0 {
delete(c.manualInfo, nodeName)
return
}
delete(info.Detail, devId)
c.manualInfo[nodeName] = info
}
func (c *Cache) DeleteDevCode(nodeName, devId, faultCode string) {
c.mutex.Lock()
defer c.mutex.Unlock()
info, ok := c.manualInfo[nodeName]
if !ok {
return
}
devList, ok := info.Detail[devId]
if ok {
for idx, fault := range devList {
if fault.FaultCode == faultCode {
info.Detail[devId] = append(info.Detail[devId][:idx], info.Detail[devId][idx+1:]...)
break
}
}
}
if len(info.Detail[devId]) == 0 {
delete(info.Detail, devId)
if len(info.Detail) == 0 {
delete(c.manualInfo, nodeName)
return
}
info.Total = utils.Remove(info.Total, devId)
}
c.manualInfo[nodeName] = info
}
func GetSepNPUByCurrentCmInfo(cm *v1.ConfigMap) map[string][]string {
if cm == nil {
return nil
}
cmInfo, err := ParseManualCm(cm)
if err != nil {
hwlog.RunLog.Errorf("parse manual cm failed, error: %v", err)
return nil
}
return GetSeparateNPU(cmInfo)
}
func GetSepNPUByLastCmInfo() map[string][]string {
return GetSeparateNPU(LastCmInfo)
}
func GetSeparateNPU(nodeInfo map[string]NodeCmInfo) map[string][]string {
if len(nodeInfo) == 0 {
return nil
}
var separateNPU = make(map[string][]string)
for nodeName, info := range nodeInfo {
for _, dev := range info.Total {
separateNPU[nodeName] = append(separateNPU[nodeName], dev)
}
}
return separateNPU
}
func ParseManualCm(cm *v1.ConfigMap) (map[string]NodeCmInfo, error) {
manualCmInfo := map[string]NodeCmInfo{}
if cm.Data == nil {
return nil, fmt.Errorf("cm has no data")
}
for node, info := range cm.Data {
nodeInfo := NodeCmInfo{}
if err := json.Unmarshal([]byte(info), &nodeInfo); err != nil {
return nil, fmt.Errorf("unmarshal configmap <%s/%s> key %s failed: %v", cm.Namespace, cm.Name, node, err)
}
manualCmInfo[node] = nodeInfo
}
return manualCmInfo, nil
}
func DeleteManualCm() {
if err := kube.DeleteConfigMap(constant.ManualDevInfoCmName, api.ClusterNS); err != nil {
if !errors.IsNotFound(err) {
hwlog.RunLog.Errorf("manually separate npu info is nill, delete configmap failed, error: %v", err)
return
}
hwlog.RunLog.Debugf("cm<%s/%s> not found, ignore", api.ClusterNS, constant.ManualDevInfoCmName)
}
LastCmInfo = make(map[string]NodeCmInfo)
}
func TryGetManualCm() (*v1.ConfigMap, error) {
const retryTime = 3
const retryInterval = 500 * time.Millisecond
var cm *v1.ConfigMap
var err error
for i := 0; i < retryTime; i++ {
cm, err = GetManualCm()
if err != nil {
hwlog.RunLog.Errorf("get cm <%s/%s> info failed, error: %v, retry",
api.ClusterNS, constant.ManualDevInfoCmName, err)
time.Sleep(retryInterval)
continue
}
break
}
return cm, err
}
func GetManualCm() (*v1.ConfigMap, error) {
cm, err := kube.GetConfigMap(constant.ManualDevInfoCmName, api.ClusterNS)
if err != nil {
if !errors.IsNotFound(err) {
hwlog.RunLog.Errorf("manually separate npu info is nil, get cm <%s/%s> failed, err: %v",
api.ClusterNS, constant.ManualDevInfoCmName, err)
return nil, err
}
hwlog.RunLog.Debugf("cm <%s/%s> not found, ignore", api.ClusterNS, constant.ManualDevInfoCmName)
return nil, nil
}
return cm, nil
}
func UpdateOrCreateManualCm() {
currentCmInfo, err := FaultCmInfo.DeepCopy()
if err != nil {
hwlog.RunLog.Errorf("deep copy fault cm info failed, error: %v", err)
return
}
if len(currentCmInfo) == 0 {
DeleteManualCm()
return
}
for _, info := range currentCmInfo {
sort.Strings(info.Total)
}
data := ConvertNodeInfoToCmData(currentCmInfo)
if err := kube.UpdateOrCreateConfigMap(constant.ManualDevInfoCmName, api.ClusterNS, data, nil); err != nil {
hwlog.RunLog.Errorf("manually separate npu info is nil, update configmap err: %v", err)
return
}
LastCmInfo = currentCmInfo
}
func ConvertNodeInfoToCmData(cmInfo map[string]NodeCmInfo) map[string]string {
cmData := make(map[string]string)
for node, info := range cmInfo {
data, err := json.Marshal(info)
if err != nil {
hwlog.RunLog.Errorf("marshal node info to json string failed, error: %v", err)
continue
}
cmData[node] = string(data)
}
return cmData
}