Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
"Ascend-device-plugin/pkg/common"
"Ascend-device-plugin/pkg/device"
"Ascend-device-plugin/pkg/next/devicefactory/customname"
"ascend-common/api"
"ascend-common/common-utils/hwlog"
"ascend-common/devmanager/dcmi"
)
type softShareDevAnnotations struct {
physicalId string
vNPUId string
dieId string
aicoreQuota string
hbmQuota string
schedulingPolicy string
}
func (ps *PluginServer) stopListAndWatch() {
if ps.isRunning.Load() {
ps.isRunning.Store(false)
ps.stop <- struct{}{}
}
}
func (ps *PluginServer) Notify(devices []*common.NpuDevice) bool {
if ps == nil {
hwlog.RunLog.Error("invalid interface receiver")
return false
}
if ps.isRunning.Load() {
ps.deepCopyDevice(devices)
ps.reciChan <- struct{}{}
return true
}
return false
}
func (ps *PluginServer) getUnhealthyAICore() sets.String {
unhealthyPhyID := sets.Int{}
allAICore := make(sets.String, len(ps.cachedDevices))
for _, device := range ps.cachedDevices {
if device.Health == v1beta1.Unhealthy {
unhealthyPhyID.Insert(int(device.PhyID))
}
allAICore.Insert(device.DeviceName)
}
realUsedAICore, err := ps.GetRealUsedAICore()
if err != nil {
hwlog.RunLog.Errorf("failed to get real used AICore device, %v", err)
return sets.String{}
}
unhealthyAICore := sets.String{}
usedAICore := sets.String{}
for k, r := range realUsedAICore {
phyID, _, err := common.GetDeviceID(r, "")
if err != nil {
hwlog.RunLog.Warn(err)
continue
}
if unhealthyPhyID.Has(phyID) {
unhealthyAICore.Insert(k)
}
usedAICore.Insert(k)
}
if unhealthyPhyID.Len() > math.MaxInt/int(ps.manager.GetChipAICore()) {
hwlog.RunLog.Errorf("the num of unhealthy device %d is invalid", unhealthyPhyID.Len())
return unhealthyAICore
}
leftUnhealthyAICoreNum := unhealthyPhyID.Len()*int(ps.manager.GetChipAICore()) - unhealthyAICore.Len()
if leftUnhealthyAICoreNum < 0 {
hwlog.RunLog.Errorf("num of left unhealthy ai core %d is less than 0", leftUnhealthyAICoreNum)
return unhealthyAICore
}
freeAICore := allAICore.Difference(usedAICore)
if freeAICore.Len() < leftUnhealthyAICoreNum {
hwlog.RunLog.Errorf("free ai core device num is %d, while need %d", freeAICore.Len(), leftUnhealthyAICoreNum)
return unhealthyAICore
}
freeList := freeAICore.List()
for count := 0; count < leftUnhealthyAICoreNum; count++ {
unhealthyAICore.Insert(freeList[count])
}
return unhealthyAICore
}
func (ps *PluginServer) GetRealUsedAICore() (map[string]string, error) {
podList := ps.manager.GetKubeClient().GetActivePodListCache()
podDeviceInfo, err := ps.GetKltAndRealAllocateDev(podList)
if err != nil {
return nil, fmt.Errorf("failed to get klt and real allocate device, %w", err)
}
usedAICore := make(map[string]string, len(podDeviceInfo))
for _, deviceInfo := range podDeviceInfo {
hwlog.RunLog.Debugf("pod info name: %s, status:%s, uid:%s", deviceInfo.Pod.Name,
deviceInfo.Pod.Status.Phase, deviceInfo.Pod.UID)
if len(deviceInfo.RealDevice) == 0 {
continue
}
for _, coreName := range deviceInfo.KltDevice {
usedAICore[coreName] = deviceInfo.RealDevice[0]
}
}
return usedAICore, nil
}
func (ps *PluginServer) generateAllDeviceMap() map[string]string {
vol2kltMap := make(map[string]string, 1)
var notInVolDev = make([]string, 0)
allDev := sets.String{}
klDev := sets.String{}
ps.allocMapLock.RLock()
ps.cachedLock.RLock()
vol2KlDevMap := make(map[string]string, len(ps.klt2RealDevMap))
for k, r := range ps.klt2RealDevMap {
vol2KlDevMap[r] = k
}
for _, dev := range ps.cachedDevices {
allDev.Insert(dev.DeviceName)
d, exist := vol2KlDevMap[dev.DeviceName]
if !exist {
notInVolDev = append(notInVolDev, dev.DeviceName)
continue
}
klDev.Insert(d)
vol2kltMap[dev.DeviceName] = d
}
ps.allocMapLock.RUnlock()
ps.cachedLock.RUnlock()
notInKlDev := allDev.Difference(klDev).List()
for index, d := range notInKlDev {
if index >= len(notInVolDev) {
hwlog.RunLog.Warnf("found volcano not using device %s in notInVolDev on local %d failed", d, index)
continue
}
vol := notInVolDev[index]
vol2kltMap[vol] = d
}
return vol2kltMap
}
func sendToKubelet(stream v1beta1.DevicePlugin_ListAndWatchServer, resp *v1beta1.ListAndWatchResponse) error {
return stream.Send(resp)
}
func (ps *PluginServer) addSoftShareDev(resp *v1beta1.ListAndWatchResponse, newDeviceName string,
device common.NpuDevice) {
hwlog.RunLog.Infof("%s add soft share device to kubelet", newDeviceName)
for i := 0; i < int(common.ParamOption.ShareCount); i++ {
resp.Devices = append(resp.Devices,
&v1beta1.Device{ID: newDeviceName + fmt.Sprintf("-%d", i), Health: device.Health})
}
}
func (ps *PluginServer) responseToKubelet() *v1beta1.ListAndWatchResponse {
resp := new(v1beta1.ListAndWatchResponse)
ps.cachedLock.RLock()
if !common.ParamOption.PresetVDevice {
unhealthyDev := ps.getUnhealthyAICore()
for _, device := range ps.cachedDevices {
if unhealthyDev.Has(device.DeviceName) {
device.Health = v1beta1.Unhealthy
} else {
device.Health = v1beta1.Healthy
}
hwlog.RunLog.Infof("ListAndWatch resp devices: %s %s", device.DeviceName, device.Health)
newDeviceName := customname.ReplaceDevicePublicName(ps.deviceType, device.DeviceName)
resp.Devices = append(resp.Devices, &v1beta1.Device{ID: newDeviceName, Health: device.Health})
}
} else if common.ParamOption.UseVolcanoType && !common.IsVirtualDev(ps.deviceType) {
vol2kltMap := ps.generateAllDeviceMap()
for _, device := range ps.cachedDevices {
d, exist := vol2kltMap[device.DeviceName]
if !exist {
hwlog.RunLog.Warnf(" not exist map key, %s map %+v", device.DeviceName, vol2kltMap)
continue
}
hwlog.RunLog.Infof("ListAndWatch resp devices: inner device: %s %s, real device: %s %s, ",
d, device.Health, device.DeviceName, device.Health)
newDeviceName := customname.ReplaceDevicePublicName(ps.deviceType, d)
if common.IsSupportSoftShareDevice() {
ps.addSoftShareDev(resp, newDeviceName, device)
continue
}
resp.Devices = append(resp.Devices, &v1beta1.Device{ID: newDeviceName, Health: device.Health})
}
} else {
for _, device := range ps.cachedDevices {
hwlog.RunLog.Infof("ListAndWatch resp devices: %s %s", device.DeviceName, device.Health)
newDeviceName := customname.ReplaceDevicePublicName(ps.deviceType, device.DeviceName)
resp.Devices = append(resp.Devices, &v1beta1.Device{ID: newDeviceName, Health: device.Health})
}
}
hwlog.RunLog.Debugf("response to kubelet resp.devices len: %+v", len(resp.Devices))
ps.cachedLock.RUnlock()
return resp
}
func (ps *PluginServer) deepCopyDevice(cachedDevices []*common.NpuDevice) {
ps.cachedLock.Lock()
ps.cachedDevices = ps.cachedDevices[:0]
for _, dev := range cachedDevices {
ps.cachedDevices = append(ps.cachedDevices, common.NpuDevice{
DeviceName: dev.DeviceName,
Health: dev.Health,
PhyID: dev.PhyID,
})
}
ps.cachedLock.Unlock()
}
func (ps *PluginServer) LastSendSuccess() bool {
return ps.deviceSyncStat.GetLastSendStatus()
}
func (ps *PluginServer) strategyForSendStats() string {
if ps.LastSendSuccess() {
return common.EmptyStrategy
}
if ps.deviceSyncStat.GetConsecutiveFailures() >= common.FailureCountThresholdForRestart {
return common.ReStartDevicePluginStrategy
}
if ps.deviceSyncStat.GetConsecutiveFailures() >= common.FailureCountThresholdForReRegistry {
return common.ReRegistryStrategy
}
return common.EmptyStrategy
}
func exitSelfProcess() error {
hwlog.RunLog.Warn("process prepare exit")
pid := os.Getpid()
process, err := os.FindProcess(pid)
if err != nil {
return err
}
return process.Signal(syscall.SIGTERM)
}
func (ps *PluginServer) reportDeviceInfo(stream v1beta1.DevicePlugin_ListAndWatchServer) {
for i := 0; i < common.RetryUpdateCount; i++ {
if err := sendToKubelet(stream, ps.responseToKubelet()); err != nil {
hwlog.RunLog.Errorf("send to kubelet failed, error is %v", err)
continue
}
hwlog.RunLog.Infof("send device-info to kubelet success, deviceType=%s", ps.deviceType)
ps.deviceSyncStat.RecordSendResult(true)
return
}
ps.deviceSyncStat.RecordSendResult(false)
hwlog.RunLog.Errorf("the number of retries (%d) retries send failed.", common.RetryUpdateCount)
}
func (ps *PluginServer) handleConsecutiveErrorStrategy(strategy string, serverRestartCount uint64) {
switch strategy {
case common.ReRegistryStrategy, common.ReStartDevicePluginStrategy:
hwlog.RunLog.Infof("strategy for sendStat=%s, %s, server restartTimes=%d",
strategy, ps.deviceSyncStat.String(), serverRestartCount)
ps.isRunning.Store(false)
if strategy == common.ReStartDevicePluginStrategy {
if err := exitSelfProcess(); err != nil {
hwlog.RunLog.Errorf("restart device plugin failed, err=%v", err)
ps.SetRestartFlag(true)
}
return
}
ps.SetRestartFlag(true)
case common.EmptyStrategy:
default:
hwlog.RunLog.Errorf("not support strategy=%s", strategy)
}
}
func (ps *PluginServer) ListAndWatch(empty *v1beta1.Empty, stream v1beta1.DevicePlugin_ListAndWatchServer) error {
serverRestartCount := ps.restartTimes.Load() - 1
hwlog.RunLog.Infof("receive ListAndWatch from kubelet, deviceType=%s, server restartTimes=%d",
ps.deviceType, serverRestartCount)
ps.isRunning.Store(true)
ps.reportDeviceInfo(stream)
for {
select {
case <-stream.Context().Done():
hwlog.RunLog.Warnf("grpc stream closed, deviceType=%s, server restartTimes=%d",
ps.deviceType, serverRestartCount)
if ps.isRunning.Load() {
hwlog.RunLog.Infof("server should running, restart it, restartTimes=%d",
serverRestartCount)
ps.isRunning.Store(false)
ps.SetRestartFlag(true)
}
return nil
case <-ps.stop:
hwlog.RunLog.Warnf("ps.stop chan receive exit signal, deviceType=%s, server restartTimes=%d",
ps.deviceType, serverRestartCount)
ps.isRunning.Store(false)
return nil
case <-ps.reciChan:
ps.reportDeviceInfo(stream)
strategy := ps.strategyForSendStats()
ps.handleConsecutiveErrorStrategy(strategy, serverRestartCount)
if strategy != common.EmptyStrategy {
return nil
}
}
}
}
func (ps *PluginServer) deviceExists(id string) bool {
ps.cachedLock.RLock()
defer ps.cachedLock.RUnlock()
for _, d := range ps.cachedDevices {
if d.DeviceName == id {
return true
}
}
return false
}
func (ps *PluginServer) checkAllocateRequest(requests *v1beta1.AllocateRequest) error {
if requests == nil {
return fmt.Errorf("invalid requests")
}
if len(requests.ContainerRequests) > common.MaxContainerLimit {
return fmt.Errorf("the number of container request %d exceeds the upper limit",
len(requests.ContainerRequests))
}
for _, rqt := range requests.ContainerRequests {
if len(rqt.DevicesIDs) > common.MaxDevicesNum*common.MinAICoreNum {
return fmt.Errorf("the devices can't bigger than %d", common.MaxDevicesNum)
}
allocateDevices := customname.ReplaceDeviceInnerName(ps.deviceType, rqt.DevicesIDs)
for _, deviceName := range allocateDevices {
if len(deviceName) > common.MaxDeviceNameLen {
return fmt.Errorf("length of device name %d is invalid", len(deviceName))
}
if common.IsSupportSoftShareDevice() && strings.Count(deviceName, common.MiddelLine) > 1 {
deviceNameSlice := strings.Split(deviceName, common.MiddelLine)
deviceName = deviceNameSlice[0] + common.MiddelLine + deviceNameSlice[1]
}
if !ps.deviceExists(deviceName) {
return fmt.Errorf("plugin doesn't have device %s", deviceName)
}
if common.IsVirtualDev(deviceName) && len(rqt.DevicesIDs) > common.MaxRequestVirtualDeviceNum {
return fmt.Errorf("request more than %d virtual device, current is %d",
common.MaxRequestVirtualDeviceNum, len(rqt.DevicesIDs))
}
if common.IsVirtualDev(deviceName) {
ps.ascendRuntimeOptions = common.VirtualDev
return nil
}
}
}
return nil
}
func getPredicateTimeFromPodAnnotation(pod *v1.Pod) uint64 {
assumeTimeStr, ok := pod.Annotations[common.PodPredicateTime]
if !ok {
hwlog.RunLog.Warnf("volcano not write timestamp, pod Name: %s", pod.Name)
return math.MaxUint64
}
if len(assumeTimeStr) > common.PodAnnotationMaxLength {
hwlog.RunLog.Warnf("timestamp fmt invalid, pod Name: %s", pod.Name)
return math.MaxUint64
}
predicateTime, err := strconv.ParseUint(assumeTimeStr, common.BaseDec, common.BitSize)
if err != nil {
hwlog.RunLog.Errorf("parse timestamp failed, %v", err)
return math.MaxUint64
}
return predicateTime
}
func (ps *PluginServer) getOldestPod(pods []v1.Pod) *v1.Pod {
if len(pods) == 0 {
return nil
}
oldest := pods[0]
for _, pod := range pods {
hwlog.RunLog.Debugf("pod %s, predicate time: %s", pod.Name, pod.Annotations[common.PodPredicateTime])
if getPredicateTimeFromPodAnnotation(&oldest) > getPredicateTimeFromPodAnnotation(&pod) {
oldest = pod
}
}
hwlog.RunLog.Debugf("oldest pod %#v, predicate time: %#v", oldest.Name,
oldest.Annotations[common.PodPredicateTime])
annotation := map[string]string{common.PodPredicateTime: strconv.FormatUint(math.MaxUint64, common.BaseDec)}
if err := ps.manager.GetKubeClient().TryUpdatePodCacheAnnotation(&oldest, annotation); err != nil {
hwlog.RunLog.Errorf("update pod %s failed, err: %v", oldest.Name, err)
return nil
}
return &oldest
}
func (ps *PluginServer) updateAllocMap(realAlloc, kltAlloc []string) {
if common.ParamOption.PresetVDevice {
ps.updatePresetAllocMap(realAlloc, kltAlloc)
} else {
ps.updateDynamicAllocMap(realAlloc, kltAlloc)
}
}
func (ps *PluginServer) updateDynamicAllocMap(realAlloc, kltAlloc []string) {
if len(realAlloc) == 0 {
hwlog.RunLog.Warn("not allocate any device")
return
}
for _, id := range kltAlloc {
if _, exist := ps.klt2RealDevMap[id]; exist {
delete(ps.klt2RealDevMap, id)
}
}
for _, id := range realAlloc {
for k, v := range ps.klt2RealDevMap {
if v == id {
delete(ps.klt2RealDevMap, k)
}
}
}
isVirtualDev := common.IsVirtualDev(realAlloc[0])
if isVirtualDev && len(realAlloc) > 1 {
hwlog.RunLog.Warnf("virtual device only support allocate one, %v", realAlloc)
return
}
if isVirtualDev {
for _, id := range kltAlloc {
ps.klt2RealDevMap[id] = realAlloc[0]
}
return
}
chipAICore := ps.manager.GetChipAICore()
if int(chipAICore)*len(realAlloc) != len(kltAlloc) {
hwlog.RunLog.Warnf("klt allocate core not equal real allocate %v", realAlloc)
return
}
realIdx := 0
for kltIdx, id := range kltAlloc {
if len(realAlloc) <= realIdx {
hwlog.RunLog.Errorf("realAlloc index out of range[%d] with length %d", realIdx, len(realAlloc))
break
}
ps.klt2RealDevMap[id] = realAlloc[realIdx]
if ((kltIdx + 1) % int(chipAICore)) == 0 {
realIdx++
}
}
}
func (ps *PluginServer) updatePresetAllocMap(realAlloc, kltAlloc []string) {
if common.IsSupportSoftShareDevice() {
if len(realAlloc) == 0 {
hwlog.RunLog.Error("number of devices of real allocate is 0")
return
}
ps.allocMapLock.Lock()
defer ps.allocMapLock.Unlock()
for _, id := range kltAlloc {
if _, exist := ps.klt2RealDevMap[id]; exist {
delete(ps.klt2RealDevMap, id)
}
}
for _, id := range kltAlloc {
ps.klt2RealDevMap[id] = realAlloc[0]
}
return
}
if len(realAlloc) != len(kltAlloc) {
hwlog.RunLog.Error("number of devices of klt allocate not equal real allocate")
return
}
ps.allocMapLock.Lock()
defer ps.allocMapLock.Unlock()
for _, id := range kltAlloc {
if _, exist := ps.klt2RealDevMap[id]; exist {
delete(ps.klt2RealDevMap, id)
}
}
for i, id := range kltAlloc {
ps.klt2RealDevMap[id] = realAlloc[i]
}
}
func (ps *PluginServer) GetRealAllocateDevicesFromMap(kltAllocate []string) ([]string, error) {
if ps == nil {
return nil, fmt.Errorf("invalid interface receiver when get real dev from map")
}
ps.allocMapLock.RLock()
defer ps.allocMapLock.RUnlock()
realAllocate := sets.String{}
if !common.ParamOption.UseVolcanoType {
return kltAllocate, nil
}
for _, id := range kltAllocate {
realID, exist := ps.klt2RealDevMap[id]
if !exist {
return nil, fmt.Errorf("cannot found real allocate device by %s", id)
}
realAllocate.Insert(realID)
}
return realAllocate.List(), nil
}
func (ps *PluginServer) GetRealAllocateDevicesFromEnv(pod v1.Pod) []string {
if ps == nil {
hwlog.RunLog.Error("invalid interface receiver when get real dev from env")
return nil
}
containers := pod.Spec.Containers
if len(containers) == 0 {
hwlog.RunLog.Error("no container here")
return nil
}
for _, container := range containers {
if len(container.Env) == 0 {
hwlog.RunLog.Debug("no env setting here")
continue
}
for _, env := range container.Env {
if env.Name != common.AscendVisibleDevicesEnv ||
env.ValueFrom == nil || env.ValueFrom.FieldRef == nil {
continue
}
fieldPath := fmt.Sprintf("%s['%s%s']",
common.MetaDataAnnotation, api.ResourceNamePrefix, ps.deviceType)
if env.ValueFrom.FieldRef.FieldPath != fieldPath {
hwlog.RunLog.Errorf("fieldPath in downward api is different from %v, "+
"which may affect the mounting of device", ps.deviceType)
continue
}
volAllocateDevice, err := common.GetDeviceFromPodAnnotation(&pod, ps.deviceType)
if err != nil {
hwlog.RunLog.Errorf("get volcano device err: %v", err)
return nil
}
return volAllocateDevice
}
}
hwlog.RunLog.Debug("maybe no downward api setting here")
return nil
}
func (ps *PluginServer) GetKltAndRealAllocateDev(podList []v1.Pod) ([]*common.PodDeviceInfo, error) {
if ps == nil {
return nil, fmt.Errorf("invalid interface receiver")
}
prClient := NewPodResource()
podDevice, err := prClient.GetPodResource()
if err != nil {
return nil, fmt.Errorf("get pod resource failed, %v", err)
}
var podDeviceInfo = make([]*common.PodDeviceInfo, 0)
for _, pod := range podList {
podKey := pod.Namespace + common.UnderLine + pod.Name
podResource, exist := podDevice[podKey]
if !exist {
continue
}
if podResource.ResourceName != api.ResourceNamePrefix+ps.deviceType {
hwlog.RunLog.Debugf("podKey %s resource name %s not equal device type %s", podKey,
podResource.ResourceName, ps.deviceType)
continue
}
if common.ParamOption.PresetVDevice && common.IsVirtualDev(ps.deviceType) {
hwlog.RunLog.Debugf("append pod(%s) %v", podKey, podResource.DeviceIds)
podDeviceInfo = append(podDeviceInfo, &common.PodDeviceInfo{Pod: pod, KltDevice: podResource.DeviceIds,
RealDevice: podResource.DeviceIds})
continue
}
realDeviceList, err := ps.GetRealAllocateDevices(pod, podResource.DeviceIds)
if err != nil {
hwlog.RunLog.Warnf("%s %s", podKey, err)
continue
}
volAllocatedDevices := ps.GetRealAllocateDevicesFromEnv(pod)
if len(volAllocatedDevices) != 0 {
realDeviceList = volAllocatedDevices
ps.updateAllocMap(realDeviceList, podResource.DeviceIds)
hwlog.RunLog.Debugf("get real devices:%v from env successfully", realDeviceList)
}
hwlog.RunLog.Debugf("append pod(%s) %v", podKey, realDeviceList)
podDeviceInfo = append(podDeviceInfo, &common.PodDeviceInfo{Pod: pod, KltDevice: podResource.DeviceIds,
RealDevice: realDeviceList})
}
return podDeviceInfo, nil
}
func (ps *PluginServer) GetRealAllocateDevices(pod v1.Pod, kltAllocate []string) ([]string, error) {
realDeviceList, err := ps.GetRealAllocateDevicesFromMap(kltAllocate)
if err == nil {
return realDeviceList, nil
}
hwlog.RunLog.Warnf("get real allocate devices err: %v", err)
realDevice, exist := pod.Annotations[api.PodAnnotationAscendReal]
if exist {
realDeviceList = strings.Split(realDevice, common.CommaSepDev)
ps.updateAllocMap(realDeviceList, kltAllocate)
return realDeviceList, nil
}
return nil, errors.New("not found real allocate device")
}
func (ps *PluginServer) DestroyNotUsedVNPU() error {
allDevInfo, err := ps.manager.GetNPUs()
if err != nil {
return err
}
podList := ps.manager.GetKubeClient().GetAllPodListCache()
podDeviceInfo, err := ps.GetKltAndRealAllocateDev(podList)
if err != nil {
return err
}
usedDevice := ps.removeVGroup(podDeviceInfo)
var needToDestroy []string
for _, dev := range allDevInfo.AllDevs {
if !usedDevice.Has(dev.DeviceName) {
needToDestroy = append(needToDestroy, dev.DeviceName)
}
}
for _, dev := range needToDestroy {
if !common.IsVirtualDev(dev) {
continue
}
if err = ps.manager.DestroyVirtualDevice(dev); err == nil {
hwlog.RunLog.Infof("destroy virtual device %s success", dev)
} else {
hwlog.RunLog.Infof("destroy virtual device %s failed, %v", dev, err)
}
}
return nil
}
func (ps *PluginServer) removeVGroup(podDeviceInfo []*common.PodDeviceInfo) sets.String {
usedDevice := sets.String{}
for _, deviceInfo := range podDeviceInfo {
usedDevice.Insert(deviceInfo.RealDevice...)
}
noVGroupDevice := sets.String{}
for dev := range usedDevice {
vDevAndGroup := strings.Split(dev, common.UnderLine)
if len(vDevAndGroup) == 1 || len(vDevAndGroup) == common.VGroupAndDevLen {
noVGroupDevice.Insert(vDevAndGroup[0])
}
}
return noVGroupDevice
}
func checkAnnotationAllocateValid(requestDevices []string, deviceType string, pod *v1.Pod, chipAICore int32) bool {
if predicateTime, ok := pod.Annotations[common.PodPredicateTime]; ok {
if predicateTime == strconv.FormatUint(math.MaxUint64, common.BaseDec) {
hwlog.RunLog.Debugf("The pod has been mounted to a device, pod name: %s", pod.Name)
return false
}
}
if common.ParamOption.PresetVDevice {
allocateDevice, err := common.GetDeviceFromPodAnnotation(pod, deviceType)
if err != nil {
return false
}
if common.IsSupportSoftShareDevice() {
aicoreQuotaStr, hasAicoreQuota := pod.Annotations[api.SchedulerSoftShareDevAicoreQuotaKey]
if !hasAicoreQuota {
hwlog.RunLog.Warnf("pod %s has no aicore quota", pod.Name)
return false
}
aicoreQuota, err := strconv.Atoi(aicoreQuotaStr)
if err != nil {
hwlog.RunLog.Warnf("pod %s has invalid aicore quota", pod.Name)
return false
}
return aicoreQuota == len(requestDevices)
}
return len(allocateDevice) == len(requestDevices)
}
annotation, err := common.GetPodAnnotationByDeviceType(pod, deviceType)
if err != nil {
hwlog.RunLog.Warn(err)
return false
}
deviceInfos := strings.Split(annotation, common.MiddelLine)
if len(deviceInfos) > 1 {
_, template, err := common.GetVNPUSegmentInfo(deviceInfos)
if err != nil {
hwlog.RunLog.Warn(err)
return false
}
aiCore, err := common.GetAICore(template)
if err != nil {
hwlog.RunLog.Warn(err)
return false
}
return len(requestDevices) == aiCore
}
phyDevices := strings.Split(deviceInfos[0], common.CommaSepDev)
return len(requestDevices) == len(phyDevices)*int(chipAICore)
}
func (ps *PluginServer) getAICoreFromPodAnnotation(pod *v1.Pod, deviceType string) ([]string, error) {
if err := ps.DestroyNotUsedVNPU(); err != nil {
return nil, err
}
annotation, err := common.GetPodAnnotationByDeviceType(pod, deviceType)
if err != nil {
return nil, err
}
deviceInfos := strings.Split(annotation, common.MiddelLine)
if len(deviceInfos) > 1 {
phyID, templateName, err := common.GetVNPUSegmentInfo(deviceInfos)
if err != nil {
return nil, err
}
deviceName, err := ps.manager.CreateVirtualDevice(phyID, templateName)
if err != nil {
return nil, err
}
ps.ascendRuntimeOptions = common.VirtualDev
return []string{deviceName}, nil
}
ps.ascendRuntimeOptions = ""
var phyDevs []string
ids := strings.Split(deviceInfos[0], common.CommaSepDev)
for _, id := range ids {
phyDevs = append(phyDevs, fmt.Sprintf("%s-%s", ps.manager.GetName(), id))
}
inValidIDList := ps.isValidRequestID(ids)
if len(inValidIDList) != 0 {
hwlog.RunLog.Errorf("volcano allocated id %s is invalid", inValidIDList)
return nil, fmt.Errorf(common.NoNPUResource)
}
return phyDevs, nil
}
func (ps *PluginServer) isValidRequestID(phyDevs []string) []string {
var inValidIDList []string
for _, phyID := range phyDevs {
if ps.isValidPhyID(phyID) {
continue
}
inValidIDList = append(inValidIDList, phyID)
}
return inValidIDList
}
func (ps *PluginServer) isValidPhyID(phyID string) bool {
for _, cacheDev := range ps.cachedDevices {
if phyID == strconv.Itoa(int(cacheDev.PhyID)) {
return true
}
}
return false
}
func (ps *PluginServer) doWithVolcanoSchedule(requestDevices []string) ([]string, string, error) {
ps.podLock.Lock()
conditionFunc := func(pod *v1.Pod) bool {
return checkAnnotationAllocateValid(requestDevices, ps.deviceType, pod, ps.manager.GetChipAICore())
}
var filteredPods = make([]v1.Pod, 0)
var allPods = make([]v1.Pod, 0)
for i := 0; i < common.GetPodFromInformerTime; i++ {
if i == common.GetPodFromInformerTime-1 {
noneCachedPod, err := ps.manager.GetKubeClient().GetActivePodList()
if err != nil {
hwlog.RunLog.Errorf("get active pod from api server failed")
ps.podLock.Unlock()
return nil, "", err
}
allPods = noneCachedPod
} else {
allPods = ps.manager.GetKubeClient().GetActivePodListCache()
}
filteredPods = common.FilterPods(allPods, ps.deviceType, conditionFunc)
if len(filteredPods) != 0 {
break
}
hwlog.RunLog.Warnf("no pod passed the filter, request device: %v, retry: %d", requestDevices, i)
time.Sleep(time.Second)
}
oldestPod := ps.getOldestPod(filteredPods)
ps.podLock.Unlock()
if oldestPod == nil {
return nil, "", fmt.Errorf("not get valid pod")
}
var allocateDevices []string
var err error
if !common.ParamOption.PresetVDevice {
common.LockAllDeviceInfo()
allocateDevices, err = ps.getAICoreFromPodAnnotation(oldestPod, ps.deviceType)
common.UnlockAllDeviceInfo()
} else {
allocateDevices, err = common.GetDeviceFromPodAnnotation(oldestPod, ps.deviceType)
}
if err != nil {
return nil, "", err
}
hwlog.RunLog.Infof("vol found: %#v", allocateDevices)
ps.updateAllocMap(allocateDevices, requestDevices)
npuInfoConfigDir := ps.getNPUInfoConfigDirFromPod(oldestPod, allocateDevices)
return allocateDevices, npuInfoConfigDir, nil
}
func (ps *PluginServer) getValidLogicDeviceID(devices []string) (int, error) {
_, ascendVisibleDevices, err := common.GetDeviceListID(devices, ps.ascendRuntimeOptions)
if err != nil {
return 0, fmt.Errorf("get device list ID failed: %w", err)
}
if len(ascendVisibleDevices) != 1 {
return 0, fmt.Errorf("visible devices length %d is not 1 (expected 1)", len(ascendVisibleDevices))
}
return ascendVisibleDevices[0], nil
}
func (ps *PluginServer) extractPodAnnotations(pod *v1.Pod) (softShareDevAnnotations, error) {
var annotations softShareDevAnnotations
var missingKeys []string
annotationChecks := []struct {
key string
valuePointer *string
}{
{api.SchedulerSoftShareDevAicoreQuotaKey, &annotations.aicoreQuota},
{api.SchedulerSoftShareDevHbmQuotaKey, &annotations.hbmQuota},
{api.SchedulerSoftShareDevPolicyKey, &annotations.schedulingPolicy},
}
for _, check := range annotationChecks {
val, ok := pod.Annotations[check.key]
if !ok || val == "" {
missingKeys = append(missingKeys, check.key)
continue
}
if check.key == api.SchedulerSoftShareDevPolicyKey {
val = common.ConvertSchedulingPolicyToIntStr(val)
}
*check.valuePointer = val
}
if len(missingKeys) > 0 {
return softShareDevAnnotations{}, fmt.Errorf("missing or empty annotations: %v", missingKeys)
}
return annotations, nil
}
func (ps *PluginServer) buildConfigDirPath(pod *v1.Pod, jobName string, logicID int, vNPUId string) string {
if jobName == "" || vNPUId == "" || pod == nil {
hwlog.RunLog.Error("invalid job name or vNPU ID")
return ""
}
namespaceJobPart := fmt.Sprintf("%s.%s", pod.Namespace, jobName)
logicIdVNPUIdPart := fmt.Sprintf("%d_%s", logicID, vNPUId)
return filepath.Join(
common.SoftShareDevNPUInfoConfigParentDirPath,
namespaceJobPart,
logicIdVNPUIdPart,
)
}
func (ps *PluginServer) writeNPUConfigFile(configDir string, annotations softShareDevAnnotations) error {
configItems := []struct {
configKey string
value string
}{
{api.SoftShareDeviceConfigPhysicalNPUId, annotations.physicalId},
{api.SoftShareDeviceConfigVirtualNPUId, annotations.vNPUId},
{api.SoftShareDeviceConfigAICoreQuota, annotations.aicoreQuota},
{api.SoftShareDeviceConfigHbmQuota, annotations.hbmQuota},
{api.SoftShareDeviceConfigShmId, annotations.dieId},
{api.SoftShareDeviceConfigSchedulingPolicy, annotations.schedulingPolicy},
}
var configLines []string
for _, item := range configItems {
if item.value == "" {
hwlog.RunLog.Warnf("config key %s has empty value, skip", item.configKey)
continue
}
configLines = append(configLines, fmt.Sprintf("%s=%s", item.configKey, item.value))
}
if len(configLines) == 0 {
return fmt.Errorf("no valid config items to write")
}
configData := strings.Join(configLines, "\n")
fileFullName := fmt.Sprintf("%s/%s", configDir, api.SoftShareDeviceConfigFileName)
if err := common.WriteToFileWithPerm(configData, fileFullName, api.DefaultSoftShareDeviceConfigDirPerm,
api.DefaultSoftShareDeviceConfigPerm); err != nil {
return fmt.Errorf("write config to file %s failed: %w", fileFullName, err)
}
hwlog.RunLog.Infof("successfully wrote NPU config to %s", fileFullName)
return nil
}
func (ps *PluginServer) getNPUInfoConfigDirFromPod(pod *v1.Pod, devices []string) string {
if pod == nil {
hwlog.RunLog.Error("pod is nil")
return ""
}
if !common.IsSoftShareDevJob(pod) {
hwlog.RunLog.Warn("pod is not share device job")
return ""
}
annotations, err := ps.extractPodAnnotations(pod)
if err != nil {
hwlog.RunLog.Errorf("extract pod annotations failed: %v", err)
return ""
}
physicalID, err := ps.getValidLogicDeviceID(devices)
if err != nil {
hwlog.RunLog.Errorf("get valid logic device ID failed: %v", err)
return ""
}
annotations.physicalId = strconv.Itoa(physicalID)
maxVirtualID, err := common.GetMaxVirtualIDByPhysicalID(physicalID)
if err != nil {
hwlog.RunLog.Errorf("get max virtual id by physical id %d failed: %v", physicalID, err)
return ""
}
if maxVirtualID >= math.MaxInt {
hwlog.RunLog.Errorf("maxVirtualID reaches int type upper limit %v, cannot increment", math.MaxInt)
return ""
}
annotations.vNPUId = strconv.Itoa(maxVirtualID + 1)
jobName := common.GetJobNameOfPod(pod)
npuInfoConfigDir := ps.buildConfigDirPath(pod, jobName, physicalID, annotations.vNPUId)
if npuInfoConfigDir == "" {
return ""
}
logicID, err := ps.manager.GetDmgr().GetLogicIDFromPhysicID(int32(physicalID))
if err != nil {
hwlog.RunLog.Errorf("get logic id failed, physical id: %d err: %v", physicalID, err)
return ""
}
dieId, err := ps.manager.GetDmgr().GetDieID(logicID, dcmi.VDIE)
if err != nil {
hwlog.RunLog.Errorf("get die id failed, logic id: %d err: %v", logicID, err)
return ""
}
annotations.dieId = dieId
if err := ps.writeNPUConfigFile(npuInfoConfigDir, annotations); err != nil {
hwlog.RunLog.Errorf("write NPU config file failed: %v", err)
return ""
}
return npuInfoConfigDir
}
func (ps *PluginServer) useVolcano(requestDevices []string) ([]string, string, error) {
if common.IsVirtualDev(ps.deviceType) {
return requestDevices, "", nil
}
return ps.doWithVolcanoSchedule(requestDevices)
}
func getDevPath(id, ascendRuntimeOptions string) (string, string) {
containerPath := fmt.Sprintf("%s%s", "/dev/davinci", id)
hostPath := containerPath
if ascendRuntimeOptions == common.VirtualDev {
hostPath = fmt.Sprintf("%s%s", "/dev/vdavinci", id)
}
return containerPath, hostPath
}
func mountDevice(resp *v1beta1.ContainerAllocateResponse, devices []int, ascendRuntimeOptions string) {
for _, deviceID := range devices {
containerPath, hostPath := getDevPath(fmt.Sprintf("%d", deviceID), ascendRuntimeOptions)
resp.Devices = append(resp.Devices, &v1beta1.DeviceSpec{
HostPath: hostPath,
ContainerPath: containerPath,
Permissions: "rw",
})
}
isNpuExists := len(devices) != 0
if isNpuExists {
mountUBDevice(resp)
}
}
func mountDefaultDevice(resp *v1beta1.ContainerAllocateResponse, defaultDevs []string) {
for _, d := range defaultDevs {
resp.Devices = append(resp.Devices, &v1beta1.DeviceSpec{
HostPath: d,
ContainerPath: getDeviceContainerPath(d),
Permissions: "rw",
})
}
}
func mountUBDevice(resp *v1beta1.ContainerAllocateResponse) {
uburmaPath := common.UburmaDevicePath
if _, err := os.Stat(uburmaPath); err == nil {
if err := addDevicesInDir(resp, uburmaPath); err != nil {
hwlog.RunLog.Warnf("read uburma devices directory error: %v", err)
} else {
hwlog.RunLog.Info("uburma devices exist, add uburma devices to spec")
}
}
ummuPath := common.UmmuDevicePath
if _, err := os.Stat(ummuPath); err == nil {
if err := addDevicesInDir(resp, ummuPath); err != nil {
hwlog.RunLog.Warnf("read ummu devices directory error: %v", err)
} else {
hwlog.RunLog.Info("ummu devices exist, add ummu devices to spec")
}
}
}
func addDevicesInDir(resp *v1beta1.ContainerAllocateResponse, dirPath string) error {
entries, err := os.ReadDir(dirPath)
if err != nil {
return fmt.Errorf("read device dir %v err:%v", dirPath, err)
}
for _, entry := range entries {
fullDevicePath := filepath.Join(dirPath, entry.Name())
if entry.IsDir() {
continue
}
resp.Devices = append(resp.Devices, &v1beta1.DeviceSpec{
HostPath: fullDevicePath,
ContainerPath: fullDevicePath,
Permissions: "rw",
})
}
return nil
}
func getDeviceContainerPath(hostPath string) string {
if hostPath == common.HiAIManagerDeviceDocker {
return common.HiAIManagerDevice
}
return hostPath
}
func (ps *PluginServer) setNPUDeviceMount(resp *v1beta1.ContainerAllocateResponse, ascendVisibleDevices []int) {
if !common.ParamOption.UseAscendDocker {
hwlog.RunLog.Info("device-plugin will use origin mount way")
mountDefaultDevice(resp, ps.defaultDevs)
mountDevice(resp, ascendVisibleDevices, ps.ascendRuntimeOptions)
return
}
common.SetAscendRuntimeEnv(ascendVisibleDevices, ps.ascendRuntimeOptions, resp)
hwlog.RunLog.Info("device-plugin will use ascend-docker to mount")
}
func (ps *PluginServer) mountShareDeviceConfig(resp *v1beta1.ContainerAllocateResponse, devices []int, npuInfoConfigDir string) {
if !common.IsSupportSoftShareDevice() {
hwlog.RunLog.Warnf("not support soft share device plugin")
return
}
if len(devices) != 1 {
hwlog.RunLog.Error("deviceIDs length is not equal to 1")
return
}
physicID := devices[0]
dieID, err := ps.getDieIDFromPhysicID(physicID)
if err != nil {
hwlog.RunLog.Errorf("get dieID from physicID %d failed: %v", physicID, err)
return
}
shareMemoryConfigFilePath := filepath.Join(common.ParamOption.SoftShareDevConfigDir, strconv.Itoa(physicID), dieID)
err = common.CreateFileIfNotExist(shareMemoryConfigFilePath, common.DefaultPerm, common.DefaultPerm)
if err != nil {
hwlog.RunLog.Errorf("create share memory config file failed: %v", err)
return
}
shareMemoryConfigFileContainerPath := filepath.Join(common.SoftShareDevConfigDirContainerPath, dieID)
resp.Mounts = append(resp.Mounts, &v1beta1.Mount{
ContainerPath: shareMemoryConfigFileContainerPath,
HostPath: shareMemoryConfigFilePath,
ReadOnly: false,
})
if npuInfoConfigDir != "" {
resp.Mounts = append(resp.Mounts, &v1beta1.Mount{
ContainerPath: common.SoftShareDevNPUInfoConfigDirContainerPath,
HostPath: npuInfoConfigDir,
ReadOnly: true,
})
}
}
func (ps *PluginServer) getDieIDFromPhysicID(physicID int) (string, error) {
logicID, err := ps.manager.GetDmgr().GetLogicIDFromPhysicID(int32(physicID))
if err != nil {
hwlog.RunLog.Errorf("get logic id from physicID id: %d failed, err: %v", physicID, err)
return "", err
}
dieId, err := ps.manager.GetDmgr().GetDieID(logicID, dcmi.VDIE)
if err != nil {
hwlog.RunLog.Errorf("get die id from logic id: %d failed, err: %v", logicID, err)
return "", err
}
return dieId, nil
}
func convertToLogicIDs(devices []int, allDevs []common.NpuDevice) []int {
var logicIDs []int
for _, phyID := range devices {
found := false
for _, dev := range allDevs {
if dev.PhyID == int32(phyID) {
logicIDs = append(logicIDs, int(dev.LogicID))
found = true
break
}
}
if !found {
hwlog.RunLog.Warnf("cannot find logicID for physicID %d", phyID)
}
}
return logicIDs
}
func getFinalVisibleDevices(ascendVisibleDevices []int, allNPUInfo common.NpuAllInfo, usePodAnnotation bool) []int {
if common.ParamOption.RealCardType == api.Ascend910A5 && !usePodAnnotation {
return convertToLogicIDs(ascendVisibleDevices, allNPUInfo.AllDevs)
}
return ascendVisibleDevices
}
func (ps *PluginServer) Allocate(ctx context.Context, requests *v1beta1.AllocateRequest) (*v1beta1.AllocateResponse,
error) {
if err := ps.checkAllocateRequest(requests); err != nil {
hwlog.RunLog.Error(err)
return nil, err
}
allNPUInfo, err := ps.manager.GetNPUs()
if err != nil {
hwlog.RunLog.Errorf("get all npus info failed: %v", err)
return nil, err
}
resps := new(v1beta1.AllocateResponse)
for _, rqt := range requests.ContainerRequests {
var err error
allocateDevices := customname.ReplaceDeviceInnerName(ps.deviceType, rqt.DevicesIDs)
if !common.ParamOption.PresetVDevice {
hwlog.RunLog.Infof("request num: %d", len(rqt.DevicesIDs))
} else if common.IsSupportSoftShareDevice() {
hwlog.RunLog.Infof("request aicore quota: %d", len(rqt.DevicesIDs))
} else {
hwlog.RunLog.Infof("request: %#v", rqt.DevicesIDs)
}
hwlog.RunLog.Debugf("len(allocateDevices)=%d, len(allNPUInfo.AllDevs)=%d",
len(allocateDevices), len(allNPUInfo.AllDevs))
var npuInfoConfigDir string
usePodAnnotation := false
if (len(allocateDevices) != len(allNPUInfo.AllDevs) || !common.ParamOption.PresetVDevice) &&
common.ParamOption.UseVolcanoType {
usePodAnnotation = true
allocateDevices, npuInfoConfigDir, err = ps.useVolcano(rqt.DevicesIDs)
if err != nil {
hwlog.RunLog.Error(err)
return nil, err
}
}
_, ascendVisibleDevices, err := common.GetDeviceListID(allocateDevices, ps.ascendRuntimeOptions)
if err != nil {
hwlog.RunLog.Error(err)
return nil, err
}
finalVisibleDevices := getFinalVisibleDevices(ascendVisibleDevices, allNPUInfo, usePodAnnotation)
resp := new(v1beta1.ContainerAllocateResponse)
ps.mountShareDeviceConfig(resp, finalVisibleDevices, npuInfoConfigDir)
ps.setNPUDeviceMount(resp, finalVisibleDevices)
ps.setHcclTopoFilePathEnv(resp, allNPUInfo)
ps.SetSlowNodeNoticeEnv(resp)
resps.ContainerResponses = append(resps.ContainerResponses, resp)
}
return resps, nil
}
func (ps *PluginServer) SetSlowNodeNoticeEnv(resp *v1beta1.ContainerAllocateResponse) {
if !common.ParamOption.EnableSlowNode {
return
}
if resp == nil {
hwlog.RunLog.Error("resp is nil")
return
}
if len((*resp).Envs) == 0 {
(*resp).Envs = make(map[string]string, common.SlowNodeStepTimeEnvNum)
}
configMap, err := ps.manager.GetKubeClient().GetConfigMap(common.SlowNodeNoticeCMName, api.KubeNS)
if err != nil {
hwlog.RunLog.Debugf("cannot find '%s' configmap, reason: %v", common.SlowNodeNoticeCMName, err)
return
}
var stepTimeInfo stepTimeCM
for _, val := range configMap.Data {
if err = json.Unmarshal([]byte(val), &stepTimeInfo); err != nil {
hwlog.RunLog.Errorf("step time configmap unmarshal error: %v", err)
return
}
}
(*resp).Envs[common.PerfDumpPathEnv] = stepTimeInfo.Data.PerfDumpPath
(*resp).Envs[common.PerfDumpConfigEnv] = stepTimeInfo.Data.PerfDumpConfig
hwlog.RunLog.Info("allocate step time env succeed")
}
func (ps *PluginServer) GetPreferredAllocation(context.Context, *v1beta1.PreferredAllocationRequest) (
*v1beta1.PreferredAllocationResponse, error) {
return nil, fmt.Errorf("not support")
}
func (ps *PluginServer) GetDevicePluginOptions(ctx context.Context, e *v1beta1.Empty) (*v1beta1.DevicePluginOptions,
error) {
return &v1beta1.DevicePluginOptions{}, nil
}
func (ps *PluginServer) PreStartContainer(ctx context.Context,
r *v1beta1.PreStartContainerRequest) (*v1beta1.PreStartContainerResponse, error) {
hwlog.RunLog.Info("PreStart just call in UT.")
return &v1beta1.PreStartContainerResponse{}, nil
}
func NewPluginServer(deviceType string, devices []*common.NpuDevice, defaultDevs []string,
manager device.DevManager) *PluginServer {
ps := &PluginServer{
restart: true,
reciChan: make(chan interface{}, 1),
deviceType: deviceType,
defaultDevs: defaultDevs,
stop: make(chan interface{}),
klt2RealDevMap: make(map[string]string, common.MaxDevicesNum),
isRunning: common.NewAtomicBool(false),
manager: manager,
deviceSyncStat: common.NewSendStats(common.DefaultSendRecordLength),
podLock: sync.Mutex{},
}
ps.restartTimes.Store(0)
ps.deepCopyDevice(devices)
return ps
}