package profiling
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"ascend-common/common-utils/hwlog"
"clusterd/pkg/application/config"
"clusterd/pkg/common/constant"
"clusterd/pkg/common/logs"
"clusterd/pkg/domain/common"
"clusterd/pkg/domain/job"
"clusterd/pkg/domain/podgroup"
"clusterd/pkg/domain/profile"
"clusterd/pkg/interface/grpc/profiling"
"clusterd/pkg/interface/kube"
)
const (
PartsOfJobNs = 2
)
type SwitchManager struct {
profiling.UnimplementedTrainingDataTraceServer
publishers map[string]*config.ConfigPublisher[*profiling.DataStatusRes]
lock sync.RWMutex
ctx context.Context
}
func NewSwitchManager(ctx context.Context) *SwitchManager {
return &SwitchManager{
publishers: make(map[string]*config.ConfigPublisher[*profiling.DataStatusRes]),
lock: sync.RWMutex{},
ctx: ctx,
}
}
const (
ErrInvalidParam = 300
ErrNotFound = 404
OK = 200
ErrServerFault = 500
)
func parseAndValidateJobNsName(in *profiling.DataTypeReq) (string, string, error) {
jobNsName := in.GetJobNsName()
jobNameInfo := strings.Split(jobNsName, "/")
if len(jobNameInfo) != PartsOfJobNs {
return "", "", fmt.Errorf("the format of jobNsName is not namespace/jobName")
}
return jobNameInfo[0], jobNameInfo[1], nil
}
func (ps *SwitchManager) sendProfilingSwitchByCm(
dtc *profile.DataTraceController, in *profiling.DataTypeReq) (*profiling.DataTypeRes, error) {
notifyErr := ps.notifySubscriber(dtc, in)
if notifyErr != nil {
hwlog.RunLog.Warnf("sendProfilingSwitchByCm notify subscribe failed: %v", notifyErr)
}
cmName := profile.DataTraceCmPrefix + dtc.JobName
owner := getPGOwner(dtc.JobNamespace, dtc.JobName)
if cm, err := kube.GetConfigMap(cmName, dtc.JobNamespace); cm == nil || err != nil {
if !errors.IsNotFound(err) {
hwlog.RunLog.Errorf("sendProfilingSwitchByCm get cm failed: %v", err)
return &profiling.DataTypeRes{Message: fmt.Sprintf("failed to found configmap:[%s/%s]",
dtc.JobNamespace, dtc.JobName), Code: ErrNotFound}, err
}
createErr := dtc.CreateDataTraceCm(in.ProfilingSwitch, owner)
if createErr != nil {
hwlog.RunLog.Errorf("sendProfilingSwitchByCm create cm failed: %v", createErr)
return &profiling.DataTypeRes{Message: fmt.Sprintf("failed to create comfigmap:[%s/%s]",
dtc.JobNamespace, dtc.JobName), Code: ErrServerFault},
fmt.Errorf("create cm err:%v", createErr)
}
} else {
updateCmErr := dtc.UpdateDataTraceCm(in.ProfilingSwitch, owner)
if updateCmErr != nil {
hwlog.RunLog.Errorf("sendProfilingSwitchByCm update cm failed: %v", updateCmErr)
return &profiling.DataTypeRes{Message: fmt.Sprintf("failed to update comfigmap:[%s/%s]",
dtc.JobNamespace, dtc.JobName), Code: ErrServerFault},
fmt.Errorf("update cm err:%v", updateCmErr)
}
}
return &profiling.DataTypeRes{Message: "successfully changed profiling marker enable status", Code: OK}, nil
}
func (ps *SwitchManager) sendProfilingSwitchByGrpc(
dtc *profile.DataTraceController, in *profiling.DataTypeReq) (*profiling.DataTypeRes, error) {
notifyErr := ps.notifySubscriber(dtc, in)
if notifyErr != nil {
hwlog.RunLog.Errorf("sendProfilingSwitchByGrpc notify subscribe failed: %v", notifyErr)
return &profiling.DataTypeRes{Message: fmt.Sprintf("notify subscribe failed"), Code: ErrServerFault},
fmt.Errorf("notify subscribe err %v", notifyErr)
}
return &profiling.DataTypeRes{Message: "successfully changed profiling marker enable status", Code: OK}, nil
}
func (ps *SwitchManager) ModifyTrainingDataTraceSwitch(ctx context.Context,
in *profiling.DataTypeReq) (*profiling.DataTypeRes, error) {
event := "modify profiling marker status"
logs.RecordLog("", event, constant.Start)
res := constant.Failed
defer func() {
logs.RecordLog("", event, res)
}()
jobNs, jobName, err := parseAndValidateJobNsName(in)
if err != nil {
return &profiling.DataTypeRes{Message: err.Error(), Code: ErrInvalidParam}, err
}
dtc := profile.NewDataTraceController(jobNs, jobName)
var response *profiling.DataTypeRes
isPodsMountProfilingCmPath, err := dtc.IsPodsMountProfilingCmPath()
if !isPodsMountProfilingCmPath {
hwlog.RunLog.Warnf("job not mount profiling cm path: %v", err)
response, err = ps.sendProfilingSwitchByGrpc(dtc, in)
} else {
response, err = ps.sendProfilingSwitchByCm(dtc, in)
}
if err != nil {
hwlog.RunLog.Errorf("send profiling switch by cm failed: %v", err)
} else {
res = constant.Success
hwlog.RunLog.Infof("successfully changed profiling marker enable status: %#v", in.ProfilingSwitch)
}
return response, err
}
func getPGOwner(jobNs, jobName string) v1.OwnerReference {
jobInfo := job.GetJobByNameSpaceAndName(jobName, jobNs)
pgInfo := podgroup.GetPodGroup(jobInfo.Key)
owner, err := podgroup.GetOwnerRefByPG(&pgInfo)
if err != nil {
hwlog.RunLog.Errorf("get owner from pg failed, error: %v", err)
return v1.OwnerReference{}
}
return owner
}
func (ps *SwitchManager) notifySubscriber(dtc *profile.DataTraceController, in *profiling.DataTypeReq) error {
jobInfo := job.GetRunningJobByNameSpaceAndName(dtc.JobName, dtc.JobNamespace)
if len(jobInfo.Key) == 0 {
return fmt.Errorf("no such job")
}
if err := ps.publish(jobInfo.Key, in.ProfilingSwitch); err != nil {
return err
}
return nil
}
func (ps *SwitchManager) publish(jobId string, info *profiling.ProfilingSwitch) error {
publisher, ok := ps.getPublisher(jobId)
if ok {
updateMsg := profiling.DataStatusRes{
Message: "update profiling switch",
ProfilingSwitch: info,
Code: OK,
}
ok := publisher.SaveData(jobId, &updateMsg)
if !ok {
errSaveSwitchFailed := "send update profiling switch %v for job %v fail"
hwlog.RunLog.Errorf(errSaveSwitchFailed, info, jobId)
return fmt.Errorf(errSaveSwitchFailed, info, jobId)
}
return nil
}
errPublisherNotFound := "publisher for job %s not found"
hwlog.RunLog.Errorf(errPublisherNotFound, jobId)
return fmt.Errorf(errPublisherNotFound, jobId)
}
func (ps *SwitchManager) GetTrainingDataTraceSwitch(ctx context.Context,
in *profiling.DataStatusReq) (*profiling.DataStatusRes, error) {
event := "get profiling switch info"
logs.RecordLog("", event, constant.Start)
res := constant.Failed
defer func() {
logs.RecordLog("", event, res)
}()
jobNsName := in.GetJobNsName()
jobNameInfo := strings.Split(jobNsName, "/")
if len(jobNameInfo) != PartsOfJobNs {
return &profiling.DataStatusRes{Message: "the format of jobNsName is not namespace/jobName",
Code: ErrInvalidParam}, nil
}
jobNs, jobName := jobNameInfo[0], jobNameInfo[1]
dtc := profile.NewDataTraceController(jobNs, jobName)
cm, err := dtc.IsDataTraceCmExist()
if cm == nil || err != nil {
hwlog.RunLog.Errorf("can not find data trace configmap[%s/%s]", dtc.JobNamespace, dtc.JobName)
return &profiling.DataStatusRes{
Message: fmt.Sprintf("failed to found configmap:[%s/%s]", dtc.JobNamespace, dtc.JobName),
Code: ErrNotFound}, err
}
data, ok := cm.Data[profile.DataTraceCmProfilingSwitchKey]
if !ok {
hwlog.RunLog.Infof("data trace configmap[%s/%s] has no %s field",
dtc.JobNamespace, dtc.JobName, profile.DataTraceCmProfilingSwitchKey)
return &profiling.DataStatusRes{Message: "data trace configmap does not contain the 'profilingSwitch' field",
Code: ErrNotFound}, nil
}
var resProfile profiling.ProfilingSwitch
if err := json.Unmarshal([]byte(data), &resProfile); err != nil {
hwlog.RunLog.Errorf("failed to unmarshal configmap[%s/%s], err:%v", dtc.JobNamespace, dtc.JobName, err)
return &profiling.DataStatusRes{
Message: fmt.Sprintf("failed to convert configmap:[%s/%s]", dtc.JobNamespace, dtc.JobName),
Code: ErrServerFault}, err
}
res = constant.Success
return &profiling.DataStatusRes{
Message: fmt.Sprintf("successfully get the status of job[%s/%s]", dtc.JobNamespace, dtc.JobName),
ProfilingSwitch: &resProfile,
Code: OK}, nil
}
func (ps *SwitchManager) SubscribeDataTraceSwitch(
clientInfo *profiling.ProfilingClientInfo, stream profiling.TrainingDataTrace_SubscribeDataTraceSwitchServer) error {
event := "subscribe profiling message signal"
logs.RecordLog(clientInfo.Role, event, constant.Start)
res := constant.Failed
defer func() {
logs.RecordLog("", event, res)
}()
hwlog.RunLog.Infof("receive Subscribe profiling message signal request, %v", clientInfo)
publisher, ok := ps.getPublisher(clientInfo.JobId)
if !ok || publisher == nil {
_, err := ps.preRegistry(clientInfo)
if err != nil {
hwlog.RunLog.Errorf("jobId=%s, preCheck err:%v", clientInfo.JobId, err)
return fmt.Errorf("jobId=%s not registered, role=%s", clientInfo.JobId, clientInfo.Role)
}
}
publisher = ps.preemptPublisher(clientInfo.JobId)
publisher.ListenDataChange(stream)
ps.deletePublisher(clientInfo.JobId, publisher.GetCreateTime())
hwlog.RunLog.Infof("jobId=%s stop subscribe fault message signal, createTime=%v",
clientInfo.JobId, publisher.GetCreateTime().UnixNano())
res = constant.Success
return nil
}
func (ps *SwitchManager) getPublisher(jobId string) (*config.ConfigPublisher[*profiling.DataStatusRes], bool) {
ps.lock.RLock()
defer ps.lock.RUnlock()
publisher, ok := ps.publishers[jobId]
return publisher, ok
}
func (ps *SwitchManager) preRegistry(req *profiling.ProfilingClientInfo) (common.RespCode, error) {
_, ok := job.GetJobCache(req.JobId)
_, err := job.GetNamespaceByJobIdAndAppType(req.JobId, req.Role)
if !ok && err != nil {
hwlog.RunLog.Errorf("jobId=%s not exist and is not multi-instance job", req.JobId)
return common.JobNotExist, fmt.Errorf("jobId=%s not exist and is not multi-instance", req.JobId)
}
if ps.serveJobNum() >= constant.MaxServeJobs {
return common.OutOfMaxServeJobs,
fmt.Errorf("jobId=%s out of max serve jobs", req.JobId)
}
return common.OK, nil
}
func (ps *SwitchManager) serveJobNum() int {
ps.lock.RLock()
defer ps.lock.RUnlock()
return len(ps.publishers)
}
func (ps *SwitchManager) preemptPublisher(jobId string) *config.ConfigPublisher[*profiling.DataStatusRes] {
ps.lock.Lock()
defer ps.lock.Unlock()
publisher, ok := ps.publishers[jobId]
if ok && publisher != nil {
publisher.Stop()
}
newPublisher := config.NewConfigPublisher[*profiling.DataStatusRes](jobId,
ps.ctx, constant.ProfilingDataType, nil)
ps.publishers[jobId] = newPublisher
return newPublisher
}
func (ps *SwitchManager) deletePublisher(jobId string, createTime time.Time) {
ps.lock.Lock()
defer ps.lock.Unlock()
publisher, ok := ps.publishers[jobId]
if !ok || publisher == nil || !createTime.Equal(publisher.GetCreateTime()) {
return
}
delete(ps.publishers, jobId)
}