package recover
import (
"context"
"errors"
"fmt"
"time"
"ascend-common/common-utils/hwlog"
"clusterd/pkg/application/faultmanager"
"clusterd/pkg/common/constant"
"clusterd/pkg/common/util"
"clusterd/pkg/domain/common"
"clusterd/pkg/domain/job"
"clusterd/pkg/interface/grpc/recover"
"clusterd/pkg/interface/kube"
)
const (
stressTestTimeout = 15 * 60
)
func (ctl *EventController) handleNotifyPauseTrain() (string, common.RespCode, error) {
ctl.uuid = common.NewEventId(randomLen)
signal := &pb.ProcessManageSignal{
Uuid: ctl.uuid,
JobId: ctl.jobInfo.JobId,
SignalType: constant.StopTrainSignalType,
Actions: pauseTrainActions,
ChangeStrategy: "",
}
if ctl.isStressTest() {
signal.Timeout = stressTestTimeout
}
return ctl.signalEnqueue(signal)
}
func (ctl *EventController) handleWaitPauseTrainComplete() (string, common.RespCode, error) {
ctx, reportChan := ctl.getCtxAndStopCompleteChan()
if reportChan == nil {
ctl.replyOMResponse("pause train failed, job service not ready")
hwlog.RunLog.Infof("jobId=%s, reportChan is nil", ctl.jobInfo.JobId)
return "", common.ServerInnerError, fmt.Errorf("jobId=%s, reportChan is nil", ctl.jobInfo.JobId)
}
select {
case <-ctx.Done():
hwlog.RunLog.Warnf("controller context canceled, jobId=%s, uuid=%s", ctl.jobInfo.JobId, ctl.uuid)
return "", common.ControllerEventCancel, nil
case req := <-reportChan:
if req.Status.Code == common.UnRecoverTrainError {
ctl.replyOMResponse("om failed, pause train failed")
return common.ProcessPauseFailEvent, common.ClientError, nil
}
if ctl.isSwitchingNic() {
return common.SwitchNicRecvPauseEvent, common.OK, nil
} else if ctl.isStressTest() {
return common.StressTestRecvPauseEvent, common.OK, nil
}
return "", common.OK, nil
case <-time.After(time.Duration(reportTimeoutMinutes) * time.Minute):
hwlog.RunLog.Errorf("wait report pause train complete timeout, jobId=%s, uuid=%s", ctl.jobInfo.JobId, ctl.uuid)
ctl.replyOMResponse("pause train timeout")
return common.ReportTimeoutEvent, common.WaitReportTimeout, nil
}
}
func (ctl *EventController) notifySwitchNic() (string, common.RespCode, error) {
globalRanks, globalOps := ctl.getSwitchNicParam()
signal := &pb.SwitchRankList{
RankID: globalRanks,
Op: globalOps,
JobId: ctl.jobInfo.JobId,
}
return ctl.switchNicSignalEnqueue(signal)
}
func (ctl *EventController) notifyContinueTrain() (string, common.RespCode, error) {
signal := &pb.ProcessManageSignal{
Uuid: ctl.uuid,
JobId: ctl.jobInfo.JobId,
SignalType: constant.ChangeStrategySignalType,
Actions: changeStrategyActions,
FaultRanks: ctl.cacheNormalFault,
ChangeStrategy: constant.ProcessContinueTrain,
}
var err error
signal.NodeRankIds, err = common.GetNodeRankIdsByFaultRanks(ctl.jobInfo.JobId, signal.FaultRanks)
if err != nil {
hwlog.RunLog.Warnf("jobId=%s, GetNodeRankIdsByFaultRanks err:%v", ctl.jobInfo.JobId, err)
}
return ctl.signalEnqueue(signal)
}
func (ctl *EventController) handleSwitchNicFinish() (string, common.RespCode, error) {
ctl.replyOMResponse("switch nic finish")
ctl.reset(false)
return "", common.OK, nil
}
func (ctl *EventController) handleWaitSwitchNicFinish() (string, common.RespCode, error) {
hwlog.RunLog.Infof("jobId=%s, wait switch nic finish....", ctl.jobInfo.JobId)
ctx, ch := ctl.getCtxAndResultChan()
if ch == nil {
hwlog.RunLog.Infof("jobId=%s, reportChan is nil", ctl.jobInfo.JobId)
ctl.replyOMResponse("switch nic failed, job service not ready")
return "", common.ServerInnerError, fmt.Errorf("jobId=%s, reportChan is nil", ctl.jobInfo.JobId)
}
ctx, rch := ctl.getCtxAndSwitchNicResultChan()
if rch == nil {
hwlog.RunLog.Infof("jobId=%s, resultChan is nil", ctl.jobInfo.JobId)
ctl.replyOMResponse("switch nic failed, job service not ready")
return "", common.ServerInnerError, fmt.Errorf("jobId=%s, reportChan is nil", ctl.jobInfo.JobId)
}
select {
case <-ctx.Done():
hwlog.RunLog.Warnf("controller context canceled, jobId=%s, uuid=%s", ctl.jobInfo.JobId, ctl.uuid)
return "", common.ControllerEventCancel, nil
case req := <-rch:
if !req.Result {
ctl.replyOMResponse("switch nic failed, switch nic failed")
return common.SwitchNicFailEvent, common.ClientError, nil
}
return common.ReceiveReportEvent, common.OK, nil
case req := <-ch:
if req.Status.Code == common.UnRecoverableRetryError {
ctl.replyOMResponse("switch nic failed, report error when switching nic")
return common.WaitSwitchNicRecvFaultEvent, common.ClientError, nil
}
case <-time.After(time.Duration(reportTimeoutMinutes) * time.Minute):
hwlog.RunLog.Errorf("wait report switch nic complete timeout, jobId=%s, uuid=%s", ctl.jobInfo.JobId, ctl.uuid)
ctl.replyOMResponse("switch nic failed, report switch nic timeout")
return common.ReportTimeoutEvent, common.WaitReportTimeout, nil
}
return "", common.OK, nil
}
func (ctl *EventController) handleDecideContinueTrainComplete() (string, common.RespCode, error) {
ctx, ch := ctl.getCtxAndResultChan()
if ch == nil {
hwlog.RunLog.Infof("jobId=%s, reportChan is nil", ctl.jobInfo.JobId)
ctl.replyOMResponse("continue train failed, job service not ready")
return "", common.ServerInnerError, fmt.Errorf("jobId=%s, reportChan is nil", ctl.jobInfo.JobId)
}
select {
case <-ctx.Done():
hwlog.RunLog.Warnf("controller context canceled, jobId=%s, uuid=%s", ctl.jobInfo.JobId, ctl.uuid)
return "", common.ControllerEventCancel, nil
case req := <-ch:
if req.Status.Code == common.UnRecoverTrainError {
ctl.replyOMResponse("switch nic failed, continue train failed")
return common.ContinueTrainFailEvent, common.ClientError, nil
}
if ctl.isSwitchingNic() {
return common.SwitchNicRecvContinueEvent, common.OK, nil
} else if ctl.isStressTest() {
return common.StressTestRecvContinueEvent, common.OK, nil
}
return "", common.OK, nil
case <-time.After(time.Duration(reportTimeoutMinutes) * time.Minute):
hwlog.RunLog.Errorf("wait report continue train complete timeout, jobId=%s, uuid=%s", ctl.jobInfo.JobId, ctl.uuid)
ctl.replyOMResponse("switch nic failed, continue train timeout")
return common.ReportTimeoutEvent, common.WaitReportTimeout, nil
}
}
func (ctl *EventController) listenSwitchNicChannel(stream pb.Recover_SubscribeSwitchNicSignalServer) {
ctx, sendChan := ctl.getCtxAndSwitchNicResponseChan()
hwlog.RunLog.Infof("start listen a new send channel, jobId=%s", ctl.jobInfo.JobId)
ctl.selectSendSwitchNicResponseChan(ctx, sendChan, stream)
}
func (ctl *EventController) getCtxAndSwitchNicResponseChan() (context.Context, chan *pb.SwitchNicResponse) {
ctl.lock.RLock()
defer ctl.lock.RUnlock()
return ctl.controllerContext, ctl.switchNicResponse
}
func (ctl *EventController) switchNicSignalEnqueue(signal *pb.SwitchRankList) (string, common.RespCode, error) {
ctx, sendChan := ctl.getCtxAndSwitchNicNotifyChan()
if sendChan == nil {
hwlog.RunLog.Errorf("jobId=%s, sendChan is nil", ctl.jobInfo.JobId)
return "", common.SignalQueueBusy, errors.New("sendChan is nil")
}
select {
case sendChan <- signal:
hwlog.RunLog.Infof("signal enqueue, jobId=%s, ranks=%v, ops=%v", signal.JobId, signal.RankID, signal.Op)
return "", common.OK, nil
case <-ctx.Done():
hwlog.RunLog.Warnf("controller context canceled, jobId=%s, uuid=%s", ctl.jobInfo.JobId, ctl.uuid)
return "", common.ControllerEventCancel, nil
case <-time.After(time.Second):
info := fmt.Sprintf("add signal time-out for jobId=%s, program may running in chaos", signal.JobId)
hwlog.RunLog.Error(info)
return "", common.SignalQueueBusy, errors.New(info)
}
}
func (ctl *EventController) getCtxAndSwitchNicNotifyChan() (context.Context, chan *pb.SwitchRankList) {
ctl.lock.RLock()
defer ctl.lock.RUnlock()
return ctl.controllerContext, ctl.switchRankList
}
func (ctl *EventController) getCtxAndSwitchNicResultChan() (context.Context, chan *pb.SwitchResult) {
ctl.lock.RLock()
defer ctl.lock.RUnlock()
return ctl.controllerContext, ctl.switchRankResult
}
func (ctl *EventController) listenSwitchNicNotifyChannel(stream pb.Recover_SubscribeNotifySwitchServer) {
ctx, sendChan := ctl.getCtxAndSwitchNicNotifyChan()
hwlog.RunLog.Infof("start listen a new switch nic send channel, jobId=%s", ctl.jobInfo.JobId)
for {
exit := ctl.selectNotifySwitchNic(ctx, sendChan, stream)
if exit {
break
}
}
}
func (ctl *EventController) selectNotifySwitchNic(ctx context.Context, sendChan chan *pb.SwitchRankList,
stream pb.Recover_SubscribeNotifySwitchServer) bool {
if sendChan == nil {
return true
}
select {
case <-ctx.Done():
hwlog.RunLog.Infof("context done, jobId=%s break listen sendChan", ctl.jobInfo.JobId)
return true
case <-stream.Context().Done():
hwlog.RunLog.Infof("stream context done, jobId=%s break listen sendChan", ctl.jobInfo.JobId)
return true
case signal, ok := <-sendChan:
if !ok {
hwlog.RunLog.Infof("sendChan closed, jobId=%s break listen sendChan", ctl.jobInfo.JobId)
return true
}
err := common.SendWithRetry(stream, signal, retryTimes)
if err != nil {
hwlog.RunLog.Errorf("send switch nic signal failed, err=%v, jobId=%s", err, ctl.jobInfo.JobId)
ctl.replyOMResponse("send switch nic signal failed, please check manually")
ctl.addEvent(common.NotifyFailEvent)
return false
}
hwlog.RunLog.Infof("switch nic signal=%v, jobId=%s", signal, ctl.jobInfo.JobId)
ctl.addEvent(common.NotifySuccessEvent)
return false
}
}
func (ctl *EventController) selectSendSwitchNicResponseChan(ctx context.Context, sendChan chan *pb.SwitchNicResponse,
stream pb.Recover_SubscribeSwitchNicSignalServer) {
if sendChan == nil {
return
}
select {
case <-ctx.Done():
hwlog.RunLog.Infof("context done, jobId=%s break listen sendChan", ctl.jobInfo.JobId)
return
case <-stream.Context().Done():
hwlog.RunLog.Infof("stream context done, jobId=%s break listen sendChan", ctl.jobInfo.JobId)
return
case signal, ok := <-sendChan:
if !ok {
hwlog.RunLog.Infof("sendChan closed, jobId=%s break listen sendChan", ctl.jobInfo.JobId)
return
}
hwlog.RunLog.Infof("switch nic signal=%v, jobId=%s", signal, ctl.jobInfo.JobId)
err := common.SendWithRetry(stream, signal, retryTimes)
if err != nil {
hwlog.RunLog.Errorf("send switch nic signal failed, err=%v, jobId=%s", err, ctl.jobInfo.JobId)
}
return
case <-time.After(time.Duration(reportTimeoutMinutes) * time.Minute):
hwlog.RunLog.Errorf("report switch nic result timeout, jobId=%s, uuid=%s", ctl.jobInfo.JobId, ctl.uuid)
ctl.replyOMResponse("report switch nic result timeout, please check manually")
}
}
func (ctl *EventController) setSwitchNicResult(result *pb.SwitchResult) {
ctl.switchRankResult <- result
}
func (ctl *EventController) setSwitchNicParam(globalSwitchRankIDs []string, globalOps []bool) {
ctl.lock.Lock()
defer ctl.lock.Unlock()
ctl.globalSwitchRankIDs = globalSwitchRankIDs
ctl.globalOps = globalOps
}
func (ctl *EventController) getSwitchNicParam() ([]string, []bool) {
ctl.lock.RLock()
defer ctl.lock.RUnlock()
return ctl.globalSwitchRankIDs, ctl.globalOps
}
func (ctl *EventController) isSwitchingNic() bool {
ctl.lock.RLock()
defer ctl.lock.RUnlock()
return len(ctl.globalSwitchRankIDs) > 0
}
func (ctl *EventController) canDoSwitchingNic() bool {
return ctl.state.GetState() == common.InitState
}
func (ctl *EventController) replyOMResponse(msg string) {
if ctl.isSwitchingNic() {
ctl.switchNicResponse <- &pb.SwitchNicResponse{
Msg: msg,
JobID: ctl.jobInfo.JobId,
}
} else if ctl.isStressTest() {
ctl.stressTestResponse <- &pb.StressTestResponse{
Msg: msg,
JobID: ctl.jobInfo.JobId,
}
}
}
func (ctl *EventController) setStressTestParam(param common.StressTestParam) {
ctl.lock.Lock()
defer ctl.lock.Unlock()
ctl.stressTestParam = param
}
func (ctl *EventController) isStressTest() bool {
ctl.lock.RLock()
defer ctl.lock.RUnlock()
return len(ctl.stressTestParam) > 0
}
func (ctl *EventController) getStressTestParam() common.StressTestParam {
ctl.lock.RLock()
defer ctl.lock.RUnlock()
return ctl.stressTestParam
}
func (ctl *EventController) notifyStressTest() (string, common.RespCode, error) {
stressTestParam := ctl.getStressTestParam()
rankOps := make(map[string]*pb.StressOpList)
for _, p := range stressTestParam {
for rankID, ops := range p {
rankOps[rankID] = &pb.StressOpList{
Ops: ops,
}
}
}
signal := &pb.StressTestRankParams{
StressParam: rankOps,
JobId: ctl.jobInfo.JobId,
}
return ctl.stressTestSignalEnqueue(signal)
}
func (ctl *EventController) stressTestSignalEnqueue(signal *pb.StressTestRankParams) (string, common.RespCode, error) {
ctx, sendChan := ctl.getCtxAndStressTestNotifyChan()
if sendChan == nil {
hwlog.RunLog.Errorf("jobId=%s, sendChan is nil", ctl.jobInfo.JobId)
return "", common.SignalQueueBusy, errors.New("sendChan is nil")
}
select {
case sendChan <- signal:
hwlog.RunLog.Infof("signal enqueue, jobId=%s, params=%v", signal.JobId, signal.StressParam)
return "", common.OK, nil
case <-ctx.Done():
hwlog.RunLog.Warnf("controller context canceled, jobId=%s, uuid=%s", ctl.jobInfo.JobId, ctl.uuid)
return "", common.ControllerEventCancel, nil
case <-time.After(time.Second):
info := fmt.Sprintf("add signal time-out for jobId=%s, program may running in chaos", signal.JobId)
hwlog.RunLog.Errorf("signal: %v enqueue time-out, %s", signal, info)
return "", common.SignalQueueBusy, errors.New(info)
}
}
func (ctl *EventController) getCtxAndStressTestNotifyChan() (context.Context, chan *pb.StressTestRankParams) {
ctl.lock.RLock()
defer ctl.lock.RUnlock()
return ctl.controllerContext, ctl.stressTestNotifyChan
}
func (ctl *EventController) listenStressTestNotifyChannel(stream pb.Recover_SubscribeNotifyExecStressTestServer) {
ctx, sendChan := ctl.getCtxAndStressTestNotifyChan()
hwlog.RunLog.Infof("start listen a new stress test send channel, jobId=%s", ctl.jobInfo.JobId)
for {
if ctl.selectNotifyStressTest(ctx, sendChan, stream) {
break
}
}
}
func (ctl *EventController) selectNotifyStressTest(ctx context.Context, sendChan chan *pb.StressTestRankParams,
stream pb.Recover_SubscribeNotifyExecStressTestServer) bool {
if sendChan == nil {
return true
}
select {
case <-ctx.Done():
hwlog.RunLog.Infof("context done, jobId=%s break listen sendChan", ctl.jobInfo.JobId)
return true
case <-stream.Context().Done():
hwlog.RunLog.Infof("stream context done, jobId=%s break listen sendChan", ctl.jobInfo.JobId)
return true
case signal, ok := <-sendChan:
if !ok {
hwlog.RunLog.Infof("sendChan closed, jobId=%s break listen sendChan", ctl.jobInfo.JobId)
return true
}
err := common.SendWithRetry(stream, signal, retryTimes)
if err != nil {
hwlog.RunLog.Errorf("send stress test signal failed, err=%v, jobId=%s", err, ctl.jobInfo.JobId)
ctl.replyOMResponse("send stress test signal failed, please check manually")
ctl.addEvent(common.NotifyFailEvent)
return false
}
hwlog.RunLog.Infof("stress test signal=%v, jobId=%s", signal, ctl.jobInfo.JobId)
ctl.addEvent(common.NotifySuccessEvent)
return false
}
}
func (ctl *EventController) waitStressTestFinishRecvFault(ctx context.Context,
rch chan *pb.StressTestResult) (string, common.RespCode, error) {
hwlog.RunLog.Warnf("recv fault, when stressing test, jobId=%s", ctl.jobInfo.JobId)
ctl.replyOMResponse("recv fault, when stressing test")
select {
case <-ctx.Done():
hwlog.RunLog.Warnf("controller context canceled, jobId=%s, uuid=%s", ctl.jobInfo.JobId, ctl.uuid)
return "", common.ControllerEventCancel, nil
case req := <-rch:
_, msg := ctl.parseStressTestResult(req)
ctl.replyOMResponse(msg)
hwlog.RunLog.Warnf("stress test failed, start recover..., jobId=%s", ctl.jobInfo.JobId)
return common.StressTestFailEvent, common.ClientError, nil
case <-time.After(time.Duration(reportTimeoutMinutes) * time.Minute):
hwlog.RunLog.Errorf("wait report stress test timeout, jobId=%s, uuid=%s", ctl.jobInfo.JobId, ctl.uuid)
ctl.replyOMResponse("stress test failed, report stress test timeout")
return common.ReportTimeoutEvent, common.WaitReportTimeout, nil
}
}
func (ctl *EventController) handleWaitStressTestFinish() (string, common.RespCode, error) {
hwlog.RunLog.Infof("jobId=%s, wait stress test finish....", ctl.jobInfo.JobId)
nodes := make([]string, 0)
for node, _ := range ctl.getStressTestParam() {
nodes = append(nodes, node)
}
faultmanager.FilterStressTestFault(ctl.jobInfo.JobId, nodes, true)
defer faultmanager.FilterStressTestFault(ctl.jobInfo.JobId, nodes, false)
ctl.sendAgentSignal(constant.WaitStartAgentSignalType, pauseStartAgentActions)
cm, err := common.RetryWriteResetCM(ctl.jobInfo.JobName, ctl.jobInfo.Namespace, nil, false,
constant.NotifyFaultFlushingOperation)
if err != nil {
hwlog.RunLog.Errorf("notify agent faultFlushing error, err=%v", err)
} else {
hwlog.RunLog.Infof("write configmap FaultFlushing success, %s", cm.Data[constant.ResetInfoCMDataKey])
}
ctx, ch := ctl.getCtxAndResultChan()
if ch == nil {
hwlog.RunLog.Infof("jobId=%s, reportChan is nil", ctl.jobInfo.JobId)
ctl.replyOMResponse("stress test failed, job service not ready")
return "", common.ServerInnerError, fmt.Errorf("jobId=%s, reportChan is nil", ctl.jobInfo.JobId)
}
ctx, rch := ctl.getCtxAndStressTestResultChan()
if rch == nil {
hwlog.RunLog.Infof("jobId=%s, resultChan is nil", ctl.jobInfo.JobId)
ctl.replyOMResponse("stress test failed, job service not ready")
return "", common.ServerInnerError, fmt.Errorf("jobId=%s, reportChan is nil", ctl.jobInfo.JobId)
}
return ctl.waitStressTestDone(ctx, rch, ch)
}
func (ctl *EventController) sendAgentSignal(signalType string, action []string) {
ctl.uuid = common.NewEventId(randomLen)
signal := &pb.ProcessManageSignal{
Uuid: ctl.uuid,
JobId: ctl.jobInfo.JobId,
SignalType: signalType,
Actions: action,
ChangeStrategy: "",
}
_, _, err := ctl.signalEnqueue(signal)
if err != nil {
hwlog.RunLog.Errorf("send signalType=%s failed, err=%v, jobId=%s", signalType, err, ctl.jobInfo.JobId)
return
}
hwlog.RunLog.Infof("send signalType=%s, jobId=%s", signalType, ctl.jobInfo.JobId)
}
func (ctl *EventController) waitStressTestDone(ctx context.Context, rch chan *pb.StressTestResult,
ch chan *pb.RecoverStatusRequest) (string, common.RespCode, error) {
select {
case <-ctx.Done():
hwlog.RunLog.Warnf("controller context canceled, jobId=%s, uuid=%s", ctl.jobInfo.JobId, ctl.uuid)
return "", common.ControllerEventCancel, nil
case req := <-rch:
if _, err := common.RetryWriteResetCM(ctl.jobInfo.JobName, ctl.jobInfo.Namespace, nil,
false, constant.ClearOperation); err != nil {
hwlog.RunLog.Errorf("notify agent faultFlushing error, err=%v", err)
}
ok, msg := ctl.parseStressTestResult(req)
if !ok {
ctl.replyOMResponse(msg)
return common.StressTestFailEvent, common.ClientError, nil
}
ctl.replyOMResponse(msg)
return common.ReceiveReportEvent, common.OK, nil
case req := <-ch:
if req.Status.Code == common.UnRecoverableRetryError {
return ctl.waitStressTestFinishRecvFault(ctx, rch)
}
case <-time.After(time.Duration(reportTimeoutMinutes) * time.Minute):
hwlog.RunLog.Errorf("wait report stress test timeout, jobId=%s, uuid=%s", ctl.jobInfo.JobId, ctl.uuid)
ctl.replyOMResponse("stress test failed, report stress test timeout")
return common.ReportTimeoutEvent, common.WaitReportTimeout, nil
}
return "", common.OK, nil
}
func (ctl *EventController) parseStressTestResult(result *pb.StressTestResult) (bool, string) {
hwlog.RunLog.Infof("jobId=%s, StressTestResult is %v", result.JobId, result.StressResult)
ctl.sendAgentSignal(constant.ContinueStartAgentSignalType, continueStartAgentActions)
jobInfo, ok := job.GetJobCache(result.JobId)
if !ok {
hwlog.RunLog.Errorf("get job cache failed, jobId=%s", result.JobId)
return false, fmt.Sprintf("get job cache failed, jobId=%s", result.JobId)
}
rankNodeMap := make(map[string]string)
rankDevMap := make(map[string]string)
for _, server := range jobInfo.JobRankTable.ServerList {
for _, dev := range server.DeviceList {
rankNodeMap[dev.RankID] = server.ServerName
rankDevMap[dev.RankID] = dev.DeviceID
}
}
hwlog.RunLog.Infof("jobId=%s, , rankNodeMap is %v", result.JobId, rankNodeMap)
nodeRankResultMap := make(map[string]map[string]*pb.StressTestRankResult)
faultRank := make([]*pb.FaultRank, 0)
for rankID, opResult := range result.StressResult {
nodeName := rankNodeMap[rankID]
if _, ok := nodeRankResultMap[nodeName]; !ok {
nodeRankResultMap[nodeName] = make(map[string]*pb.StressTestRankResult)
}
devID := rankDevMap[rankID]
nodeRankResultMap[nodeName][devID] = opResult
for _, res := range opResult.RankResult {
if res.Code == constant.StressTestFindFault {
ctl.isolateNodes.Insert(nodeName)
}
if res.Code == constant.StressTestTimeout || res.Code == constant.StressTestVolRecoverFail {
faultRank = append(faultRank, &pb.FaultRank{RankId: rankID, FaultType: constant.NormalFaultType})
}
}
}
ctl.saveCacheFault(faultRank)
retStr := util.ObjToString(nodeRankResultMap)
hwlog.RunLog.Infof("jobId=%s, isolateNode:%v result:%v", result.JobId, ctl.isolateNodes, retStr)
if len(ctl.isolateNodes) > 0 {
return false, fmt.Sprintf("stress test find fault, isolate node:%v,result:%v", ctl.isolateNodes, retStr)
}
if len(faultRank) > 0 {
return false, fmt.Sprintf("stress test timeout fault, faultRank:%v,result:%v", faultRank, retStr)
}
return true, fmt.Sprintf("stress test finish, result:%v", retStr)
}
func (ctl *EventController) setStressTestResult(result *pb.StressTestResult) {
ctl.stressTestResult <- result
}
func (ctl *EventController) getCtxAndStressTestResultChan() (context.Context, chan *pb.StressTestResult) {
ctl.lock.RLock()
defer ctl.lock.RUnlock()
return ctl.controllerContext, ctl.stressTestResult
}
func (ctl *EventController) getCtxAndStressTestResponseChan() (context.Context, chan *pb.StressTestResponse) {
ctl.lock.RLock()
defer ctl.lock.RUnlock()
return ctl.controllerContext, ctl.stressTestResponse
}
func (ctl *EventController) handleStressTestFail() (string, common.RespCode, error) {
jobInfo, ok := job.GetJobCache(ctl.jobInfo.JobId)
if !ok {
hwlog.RunLog.Errorf("get job cache failed, jobId=%s", ctl.jobInfo.JobId)
return "", common.ServerInnerError, fmt.Errorf("get job cache failed, jobId=%s", ctl.jobInfo.JobId)
}
nodeRankMap := make(map[string][]string)
for _, server := range jobInfo.JobRankTable.ServerList {
ranks := make([]string, 0)
for _, dev := range server.DeviceList {
ranks = append(ranks, dev.RankID)
}
nodeRankMap[server.ServerName] = ranks
}
faultRank := make([]*pb.FaultRank, 0)
for node := range ctl.isolateNodes {
for _, rank := range nodeRankMap[node] {
faultRank = append(faultRank, &pb.FaultRank{RankId: rank, FaultType: constant.NormalFaultType})
}
labels := map[string]string{constant.NodeHealthyStatusKey: constant.NodeUnHealthy}
err := kube.RetryPatchNodeAnnotation(node, constant.PatchNodeTimes, labels)
if err != nil {
hwlog.RunLog.Errorf("patch node:%s failed: %v", node, err)
}
}
ctl.saveCacheFault(faultRank)
ctl.uuid = common.NewEventId(randomLen)
signal := &pb.ProcessManageSignal{
Uuid: ctl.uuid,
JobId: ctl.jobInfo.JobId,
SignalType: constant.StopTrainSignalType,
Actions: stopTrainActions,
ChangeStrategy: "",
}
signal.FaultRanks = append(ctl.cacheRetryFault, ctl.cacheNormalFault...)
return ctl.signalEnqueue(signal)
}
func (ctl *EventController) handleStressTestFinish() (string, common.RespCode, error) {
ctl.reset(false)
return "", common.OK, nil
}
func (ctl *EventController) listenStressTestChannel(stream pb.Recover_SubscribeStressTestResponseServer) {
ctx, sendChan := ctl.getCtxAndStressTestResponseChan()
hwlog.RunLog.Infof("start listen a new send channel, jobId=%s", ctl.jobInfo.JobId)
ctl.selectSendStressTestResponseChan(ctx, sendChan, stream)
}
func (ctl *EventController) selectSendStressTestResponseChan(ctx context.Context, sendChan chan *pb.StressTestResponse,
stream pb.Recover_SubscribeStressTestResponseServer) {
if sendChan == nil {
return
}
select {
case <-ctx.Done():
hwlog.RunLog.Infof("context done, jobId=%s break listen sendChan", ctl.jobInfo.JobId)
case <-stream.Context().Done():
hwlog.RunLog.Infof("stream context done, jobId=%s break listen sendChan", ctl.jobInfo.JobId)
case signal, ok := <-sendChan:
if !ok {
hwlog.RunLog.Infof("sendChan closed, jobId=%s break listen sendChan", ctl.jobInfo.JobId)
return
}
hwlog.RunLog.Infof("stress test signal=%v, jobId=%s", signal, ctl.jobInfo.JobId)
err := common.SendWithRetry(stream, signal, retryTimes)
if err != nil {
hwlog.RunLog.Errorf("send stress test signal failed, err=%v, jobId=%s", err, ctl.jobInfo.JobId)
}
case <-time.After(time.Duration(reportTimeoutMinutes) * time.Minute):
hwlog.RunLog.Errorf("report stress test result timeout, jobId=%s, uuid=%s", ctl.jobInfo.JobId, ctl.uuid)
ctl.replyOMResponse("report stress test result timeout, please check manually")
}
}