Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"context"
"errors"
"fmt"
"sync"
"time"
"ascend-common/common-utils/hwlog"
"clusterd/pkg/common/constant"
"clusterd/pkg/domain/common"
"clusterd/pkg/domain/epranktable"
"clusterd/pkg/interface/grpc/config"
)
type BusinessConfigServer struct {
serviceCtx context.Context
configPublisher map[string]*ConfigPublisher[*config.RankTableStream]
rankTableManager *epranktable.RankTableManager
lock sync.RWMutex
config.UnimplementedConfigServer
}
func NewBusinessConfigServer(ctx context.Context) *BusinessConfigServer {
server := &BusinessConfigServer{
serviceCtx: ctx,
configPublisher: make(map[string]*ConfigPublisher[*config.RankTableStream]),
lock: sync.RWMutex{},
}
server.rankTableManager = epranktable.GetEpGlobalRankTableManager()
server.rankTableManager.HandlerRankTable = server.rankTableChange
return server
}
func (c *BusinessConfigServer) rankTableChange(jobId, data string) (bool, error) {
publisher, ok := c.getPublisher(jobId)
if !ok || !publisher.IsSubscribed() {
return true, errors.New("job not registered or not subscribed")
}
hwlog.RunLog.Infof("ranktable changed, jobId=%s", jobId)
rankTable := &config.RankTableStream{
JobId: jobId,
RankTable: data,
}
if isSaved := publisher.SaveData(jobId, rankTable); !isSaved {
return true, errors.New("save data failed")
}
return false, nil
}
func (c *BusinessConfigServer) Register(ctx context.Context, req *config.ClientInfo) (*config.Status, error) {
hwlog.RunLog.Infof("business config service receive Register request, jobId=%s, role=%s",
req.JobId, req.Role)
c.preemptPublisher(req.JobId)
return &config.Status{Code: int32(common.OK), Info: "register success"}, nil
}
func (c *BusinessConfigServer) SubscribeRankTable(request *config.ClientInfo,
stream config.Config_SubscribeRankTableServer) error {
hwlog.RunLog.Infof("receive Subscribe ranktable request, jobId=%s, rule=%s",
request.JobId, request.Role)
publisher, ok := c.getPublisher(request.JobId)
if !ok || publisher == nil {
hwlog.RunLog.Warnf("jobId=%s not registered, role=%s", request.JobId, request.Role)
return fmt.Errorf("jobId=%s not registered, role=%s", request.JobId, request.Role)
}
epranktable.GetRankTableMessageQueue().AddRateLimited(&epranktable.GenerateGlobalRankTableMessage{
JobId: request.JobId,
Namespace: "",
})
publisher.ListenDataChange(stream)
c.deletePublisher(request.JobId, publisher.GetCreateTime())
hwlog.RunLog.Infof("jobId=%s stop subscribe ranktable, createTime=%v",
request.JobId, publisher.GetCreateTime().UnixNano())
return nil
}
func (c *BusinessConfigServer) preemptPublisher(jobId string) *ConfigPublisher[*config.RankTableStream] {
c.lock.Lock()
defer c.lock.Unlock()
publisher, ok := c.configPublisher[jobId]
if ok && publisher != nil {
publisher.Stop()
}
newPublisher := NewConfigPublisher[*config.RankTableStream](jobId, c.serviceCtx,
constant.RankTableDataType, nil)
c.configPublisher[jobId] = newPublisher
return newPublisher
}
func (c *BusinessConfigServer) getPublisher(jobId string) (*ConfigPublisher[*config.RankTableStream], bool) {
c.lock.RLock()
defer c.lock.RUnlock()
publisher, ok := c.configPublisher[jobId]
return publisher, ok
}
func (c *BusinessConfigServer) deletePublisher(jobId string, createTime time.Time) {
c.lock.Lock()
defer c.lock.Unlock()
publisher, ok := c.configPublisher[jobId]
if !ok || publisher == nil || !createTime.Equal(publisher.GetCreateTime()) {
return
}
delete(c.configPublisher, jobId)
}
func (c *BusinessConfigServer) addPublisher(jobId string) {
c.lock.Lock()
defer c.lock.Unlock()
publisher := NewConfigPublisher[*config.RankTableStream](jobId, c.serviceCtx,
constant.RankTableDataType, nil)
c.configPublisher[jobId] = publisher
}