Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"context"
"io"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"ascend-common/common-utils/hwlog"
"clusterd/pkg/common/logs"
"clusterd/pkg/interface/grpc/config"
"clusterd/pkg/interface/grpc/fault"
"clusterd/pkg/interface/grpc/profiling"
)
const (
retryTimes = 3
waitSendTime = 3 * time.Second
chanBufferSize = 1000
)
type signalType interface {
*config.RankTableStream | *fault.FaultMsgSignal | *profiling.DataStatusRes
}
type grpcServerStreamType[T signalType] interface {
Send(T) error
grpc.ServerStream
}
type jobDataForChan[T signalType] struct {
jobId string
data T
}
type ConfigPublisher[T signalType] struct {
jobId string
role string
dataType string
sendChan chan *jobDataForChan[T]
sentData map[string]T
subscribe bool
compareFunc func(T, T) bool
ctxContext context.Context
ctxCancelFunc context.CancelFunc
serviceContext context.Context
isChanClosed bool
createTime time.Time
lock sync.RWMutex
}
func NewConfigPublisher[T signalType](jobId string, serviceCtx context.Context, dataType string,
compareFunc func(T, T) bool) *ConfigPublisher[T] {
publisher := &ConfigPublisher[T]{
jobId: jobId,
dataType: dataType,
sendChan: make(chan *jobDataForChan[T], chanBufferSize),
sentData: make(map[string]T),
subscribe: false,
compareFunc: compareFunc,
serviceContext: serviceCtx,
isChanClosed: false,
createTime: time.Now(),
lock: sync.RWMutex{},
}
publisher.ctxContext, publisher.ctxCancelFunc = context.WithCancel(publisher.serviceContext)
return publisher
}
func (c *ConfigPublisher[T]) ListenDataChange(stream grpcServerStreamType[T]) {
hwlog.RunLog.Infof("start listen a new %s sendChan, jobId=%s, createTime=%v",
c.dataType, c.jobId, c.createTime.UnixNano())
c.SetSubscribe(true)
for {
if !c.selectChanAndContext(stream) {
break
}
}
c.SetSubscribe(false)
}
func (c *ConfigPublisher[T]) selectChanAndContext(stream grpcServerStreamType[T]) bool {
select {
case <-c.ctxContext.Done():
hwlog.RunLog.Warnf("context canceled, jobId=%s", c.jobId)
return false
case <-stream.Context().Done():
hwlog.RunLog.Warnf("stream is closed, do not send %s, jobId=%s", c.dataType, c.jobId)
return false
case data, ok := <-c.sendChan:
if ok {
if data == nil || (c.compareFunc != nil && c.compareFunc(data.data, c.GetSentData(data.jobId))) {
return true
}
sendSuccess, stillListen := sendDataToClient(stream, data.data, c.jobId, c.dataType)
if sendSuccess {
c.SetSentData(data.jobId, data.data)
}
return stillListen
} else {
hwlog.RunLog.Warnf("%s sendChan closed, jobId=%s break listen sendChan", c.dataType, c.jobId)
return false
}
}
}
func sendDataToClient[T signalType](stream grpcServerStreamType[T], data T, jobId, dataType string) (bool, bool) {
timer := time.NewTimer(time.Second)
defer timer.Stop()
for i := 0; i < retryTimes; i++ {
err := sendWithTimeout(stream, data)
if err == nil {
hwlog.RunLog.Infof("send %s success, jobId=%s", dataType, jobId)
logs.GrpcEventLogger.Infof("send %s success, jobId=%s, data=%v", dataType, jobId, data)
return true, true
}
if err == io.EOF {
hwlog.RunLog.Warnf("send %s failed, client cancel connection, jobId=%s", dataType, jobId)
return false, false
}
hwlog.RunLog.Errorf("send %s failed, jobId=%s, error= %v", dataType, jobId, err)
if i >= retryTimes-1 {
break
}
timer.Reset(time.Second)
select {
case <-timer.C:
continue
case <-stream.Context().Done():
hwlog.RunLog.Warnf("stream is closed, do not send %s, jobId=%s", dataType, jobId)
return false, false
}
}
return false, true
}
func sendWithTimeout[T signalType](stream grpcServerStreamType[T], data T) error {
errChan := make(chan error, 1)
go func() {
errChan <- stream.Send(data)
}()
timer := time.NewTimer(waitSendTime)
defer timer.Stop()
select {
case err := <-errChan:
return err
case <-timer.C:
return status.Error(codes.DeadlineExceeded, "send data timeout")
}
}
func (c *ConfigPublisher[T]) SaveData(jobId string, data T) bool {
saved := true
defer func() {
if r := recover(); r != nil {
saved = false
hwlog.RunLog.Errorf("panic occured when saving %s, jobId=%s err=%v", c.dataType, c.jobId, r)
}
}()
if len(c.sendChan) >= chanBufferSize {
hwlog.RunLog.Warnf("sendChan is full, do not send %s jobId=%s", c.dataType, c.jobId)
return false
}
if c.isChanClosed {
return false
}
c.sendChan <- &jobDataForChan[T]{jobId: jobId, data: data}
return saved
}
func (c *ConfigPublisher[T]) Stop() {
hwlog.RunLog.Infof("jobId=%s enter %s stop function", c.jobId, c.dataType)
c.lock.Lock()
defer c.lock.Unlock()
if c.isChanClosed {
return
}
if c.ctxCancelFunc != nil {
c.ctxCancelFunc()
}
close(c.sendChan)
c.isChanClosed = true
}
func (c *ConfigPublisher[T]) SetSubscribe(isSubscribed bool) {
c.lock.Lock()
defer c.lock.Unlock()
c.subscribe = isSubscribed
}
func (c *ConfigPublisher[T]) IsSubscribed() bool {
c.lock.RLock()
defer c.lock.RUnlock()
return c.subscribe
}
func (c *ConfigPublisher[T]) SetSentData(jobId string, data T) {
c.lock.Lock()
defer c.lock.Unlock()
c.sentData[jobId] = data
}
func (c *ConfigPublisher[T]) GetSentData(jobId string) T {
c.lock.RLock()
defer c.lock.RUnlock()
return c.sentData[jobId]
}
func (c *ConfigPublisher[T]) GetAllSentJobIdList() []string {
jobIdList := make([]string, 0, len(c.sentData))
c.lock.RLock()
defer c.lock.RUnlock()
for jobId, _ := range c.sentData {
jobIdList = append(jobIdList, jobId)
}
return jobIdList
}
func (c *ConfigPublisher[T]) ClearDeletedJobIdList(jobKeyList []string) {
c.lock.RLock()
defer c.lock.RUnlock()
for _, jobId := range jobKeyList {
delete(c.sentData, jobId)
}
}
func (c *ConfigPublisher[T]) GetSentChan() chan *jobDataForChan[T] {
c.lock.RLock()
defer c.lock.RUnlock()
return c.sendChan
}
func (c *ConfigPublisher[T]) GetCreateTime() time.Time {
return c.createTime
}
func (c *ConfigPublisher[T]) GetJobId() string {
if c == nil {
return ""
}
return c.jobId
}