package handler
import (
"fmt"
"github.com/goodrain/rainbond/pkg/component/mq"
"strings"
"time"
"github.com/goodrain/rainbond/api/model"
"github.com/goodrain/rainbond/db"
dbmodel "github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/event"
gclient "github.com/goodrain/rainbond/mq/client"
"github.com/goodrain/rainbond/util"
dmodel "github.com/goodrain/rainbond/worker/discover/model"
"github.com/jinzhu/gorm"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type OperationHandler struct {
mqCli gclient.MQClient
dryRun bool
helmChart *model.HelmChart
eventIDs []string
end bool
}
type OperationResult struct {
ServiceID string `json:"service_id"`
Operation string `json:"operation"`
EventID string `json:"event_id"`
Status string `json:"status"`
ErrMsg string `json:"err_message"`
DeployVersion string `json:"deploy_version"`
}
func CreateOperationHandler() *OperationHandler {
return &OperationHandler{
mqCli: mq.Default().MqClient,
}
}
func (o *OperationHandler) SetHelmParameter(dryRun bool, helmChart *model.HelmChart, eventIDs []string, end bool) {
o.helmChart = helmChart
o.dryRun = dryRun
o.eventIDs = eventIDs
o.end = end
}
func (o *OperationHandler) Build(batchOpReq model.ComponentOpReq) (*model.ComponentOpResult, error) {
res := batchOpReq.BatchOpFailureItem()
if err := o.build(batchOpReq); err != nil {
res.ErrMsg = err.Error()
} else {
res.Success()
}
return res, nil
}
func (o *OperationHandler) build(batchOpReq model.ComponentOpReq) error {
if logrus.IsLevelEnabled(logrus.DebugLevel) {
util.Elapsed(fmt.Sprintf("build component(%s)", batchOpReq.GetComponentID()))()
}
service, err := db.GetManager().TenantServiceDao().GetServiceByID(batchOpReq.GetComponentID())
if err != nil {
return err
}
if dbmodel.ServiceKind(service.Kind) == dbmodel.ServiceKindThirdParty {
return nil
}
buildReq := batchOpReq.(*model.ComponentBuildReq)
buildReq.DeployVersion = util.CreateVersionByTime()
version := dbmodel.VersionInfo{
EventID: buildReq.GetEventID(),
ServiceID: buildReq.ServiceID,
RepoURL: buildReq.CodeInfo.RepoURL,
Kind: buildReq.Kind,
BuildVersion: buildReq.DeployVersion,
Cmd: buildReq.ImageInfo.Cmd,
Author: buildReq.Operator,
FinishTime: time.Now(),
PlanVersion: buildReq.PlanVersion,
}
if buildReq.CodeInfo.Cmd != "" && buildReq.CodeInfo.BuildType != "cnb" {
version.Cmd = buildReq.CodeInfo.Cmd
}
if err = db.GetManager().VersionInfoDao().AddModel(&version); err != nil {
return err
}
switch buildReq.Kind {
case model.FromImageBuildKing:
if err := o.buildFromImage(buildReq, service); err != nil {
return err
}
case model.FromCodeBuildKing:
if err := o.buildFromSourceCode(buildReq, service); err != nil {
return err
}
case model.FromMarketImageBuildKing:
if err := o.buildFromImage(buildReq, service); err != nil {
return err
}
case model.FromMarketSlugBuildKing:
if err := o.buildFromMarketSlug(buildReq, service); err != nil {
return err
}
case model.ExportHelmChart:
version.DeliveredType = "image"
version.DeliveredPath = buildReq.ImageInfo.ImageURL
version.ImageName = buildReq.ImageInfo.ImageURL
version.RepoURL = buildReq.ImageInfo.ImageURL
version.FinalStatus = "success"
version.FinishTime = time.Now()
err = db.GetManager().TenantServiceDao().UpdateDeployVersion(buildReq.ServiceID, buildReq.DeployVersion)
if err != nil {
return err
}
err = db.GetManager().VersionInfoDao().UpdateModel(&version)
if err != nil {
return err
}
if err = o.exportHelmChart(buildReq, service); err != nil {
return err
}
case model.FromVMBuildKing:
version.ImageName = buildReq.ImageInfo.ImageURL
err = db.GetManager().VersionInfoDao().UpdateModel(&version)
if err := o.buildFromVM(buildReq, service); err != nil {
return err
}
case model.FromKubeBlocksBuildKind:
if err := o.buildFromKubeBlocks(buildReq, service); err != nil {
return err
}
version.FinalStatus = "success"
version.FinishTime = time.Now()
err = db.GetManager().VersionInfoDao().UpdateModel(&version)
if err != nil {
return err
}
logger := event.GetManager().GetLogger(buildReq.GetEventID())
defer event.GetManager().ReleaseLogger(logger)
logger.Info("Build success", map[string]string{"step": "last", "status": "success"})
default:
return errors.New("unsupported build kind: " + buildReq.Kind)
}
return nil
}
func (o *OperationHandler) Stop(batchOpReq model.ComponentOpReq) error {
service, err := db.GetManager().TenantServiceDao().GetServiceByID(batchOpReq.GetComponentID())
if err != nil {
return err
}
body := batchOpReq.TaskBody(service)
err = o.mqCli.SendBuilderTopic(gclient.TaskStruct{
TaskType: "stop",
TaskBody: body,
Topic: gclient.WorkerTopic,
})
if err != nil {
return err
}
return nil
}
func (o *OperationHandler) Start(batchOpReq model.ComponentOpReq) error {
service, err := db.GetManager().TenantServiceDao().GetServiceByID(batchOpReq.GetComponentID())
if err != nil {
return err
}
body := batchOpReq.TaskBody(service)
err = o.mqCli.SendBuilderTopic(gclient.TaskStruct{
TaskType: "start",
TaskBody: body,
Topic: gclient.WorkerTopic,
})
if err != nil {
return err
}
return nil
}
func (o *OperationHandler) Upgrade(batchOpReq model.ComponentOpReq) (*model.ComponentOpResult, error) {
res := batchOpReq.BatchOpFailureItem()
if err := o.upgrade(batchOpReq); err != nil {
res.ErrMsg = err.Error()
} else {
res.Success()
}
return res, nil
}
func (o *OperationHandler) upgrade(batchOpReq model.ComponentOpReq) error {
component, err := db.GetManager().TenantServiceDao().GetServiceByID(batchOpReq.GetComponentID())
if err != nil {
return err
}
batchOpReq.SetVersion(component.DeployVersion)
version, err := db.GetManager().VersionInfoDao().GetVersionByDeployVersion(batchOpReq.GetVersion(), batchOpReq.GetComponentID())
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
oldDeployVersion := component.DeployVersion
var rollback = func() {
component.DeployVersion = oldDeployVersion
_ = db.GetManager().TenantServiceDao().UpdateModel(component)
}
if version != nil {
if version.FinalStatus != "success" {
logrus.Warnf("deploy version %s is not build success,can not change deploy version in this upgrade event", batchOpReq.GetVersion())
} else {
component.DeployVersion = batchOpReq.GetVersion()
err = db.GetManager().TenantServiceDao().UpdateModel(component)
if err != nil {
return err
}
}
}
body := batchOpReq.TaskBody(component)
err = o.mqCli.SendBuilderTopic(gclient.TaskStruct{
TaskBody: body,
TaskType: "rolling_upgrade",
Topic: gclient.WorkerTopic,
})
if err != nil {
rollback()
return err
}
return nil
}
func (o *OperationHandler) RollBack(rollback model.RollbackInfoRequestStruct) (re OperationResult) {
re.Operation = "rollback"
re.ServiceID = rollback.ServiceID
re.EventID = rollback.EventID
re.Status = "failure"
service, err := db.GetManager().TenantServiceDao().GetServiceByID(rollback.ServiceID)
if err != nil {
logrus.Errorf("find service %s failure %s", rollback.ServiceID, err.Error())
re.ErrMsg = fmt.Sprintf("find service %s failure", rollback.ServiceID)
return
}
if dbmodel.ServiceKind(service.Kind) == dbmodel.ServiceKindThirdParty {
re.ErrMsg = fmt.Sprintf("service %s is thirdpart service", rollback.ServiceID)
return
}
if service.IsKubeBlocksComponent() {
re.ErrMsg = fmt.Sprintf("service %s is kubeblocks component", rollback.ServiceID)
return
}
oldDeployVersion := service.DeployVersion
var rollbackFunc = func() {
service.DeployVersion = oldDeployVersion
_ = db.GetManager().TenantServiceDao().UpdateModel(service)
}
if service.DeployVersion == rollback.RollBackVersion {
logrus.Warningf("rollback version is same of current version")
}
service.DeployVersion = rollback.RollBackVersion
if err := db.GetManager().TenantServiceDao().UpdateModel(service); err != nil {
logrus.Errorf("update service %s version failure %s", rollback.ServiceID, err.Error())
re.ErrMsg = fmt.Sprintf("update service %s version failure", rollback.ServiceID)
return
}
err = o.mqCli.SendBuilderTopic(gclient.TaskStruct{
TaskBody: dmodel.RollingUpgradeTaskBody{
TenantID: service.TenantID,
ServiceID: service.ServiceID,
NewDeployVersion: rollback.RollBackVersion,
EventID: rollback.EventID,
},
TaskType: "rolling_upgrade",
Topic: gclient.WorkerTopic,
})
if err != nil {
rollbackFunc()
logrus.Errorf("equque rollback message error, %v", err)
re.ErrMsg = fmt.Sprintf("send service %s rollback message failure", rollback.ServiceID)
return
}
re.Status = "success"
return
}
func (o *OperationHandler) buildFromMarketSlug(r *model.ComponentBuildReq, service *dbmodel.TenantServices) error {
body := make(map[string]interface{})
body["deploy_version"] = r.DeployVersion
body["event_id"] = r.GetEventID()
body["action"] = r.Action
body["tenant_name"] = r.TenantName
body["tenant_id"] = service.TenantID
body["service_id"] = service.ServiceID
body["service_alias"] = service.ServiceAlias
body["slug_info"] = r.SlugInfo
body["configs"] = r.Configs
return o.sendBuildTopic(service.ServiceID, "build_from_market_slug", body, r.Arch)
}
func (o *OperationHandler) sendBuildTopic(serviceID, taskType string, body map[string]interface{}, arch string) error {
topic := gclient.BuilderTopic
if o.isWindowsService(serviceID) {
topic = gclient.WindowsBuilderTopic
}
return o.mqCli.SendBuilderTopic(gclient.TaskStruct{
Topic: topic,
TaskType: taskType,
TaskBody: body,
Arch: arch,
})
}
func (o *OperationHandler) exportHelmChart(r *model.ComponentBuildReq, service *dbmodel.TenantServices) error {
body := dmodel.RollingUpgradeTaskBody{
TenantID: service.TenantID,
ServiceID: service.ServiceID,
NewDeployVersion: r.DeployVersion,
EventID: "",
Configs: r.Configs,
AppVersion: o.helmChart.AppVersion,
AppName: o.helmChart.AppName,
DryRun: o.dryRun,
EventIDs: o.eventIDs,
End: o.end,
}
return o.mqCli.SendBuilderTopic(gclient.TaskStruct{
Topic: gclient.WorkerTopic,
TaskType: "rolling_upgrade",
TaskBody: body,
})
}
func (o *OperationHandler) buildFromImage(r *model.ComponentBuildReq, service *dbmodel.TenantServices) error {
if logrus.IsLevelEnabled(logrus.DebugLevel) {
util.Elapsed(fmt.Sprintf("[buildFromImage] build component(%s)", r.GetComponentID()))()
}
if r.ImageInfo.ImageURL == "" || r.DeployVersion == "" {
return fmt.Errorf("build from image failure, args error")
}
body := make(map[string]interface{})
body["image"] = r.ImageInfo.ImageURL
body["service_id"] = service.ServiceID
body["deploy_version"] = r.DeployVersion
body["namespace"] = service.Namespace
body["event_id"] = r.GetEventID()
body["tenant_name"] = r.TenantName
body["service_alias"] = service.ServiceAlias
body["action"] = r.Action
body["code_from"] = "image_manual"
if r.ImageInfo.User != "" && r.ImageInfo.Password != "" {
body["user"] = r.ImageInfo.User
body["password"] = r.ImageInfo.Password
}
body["configs"] = r.Configs
return o.sendBuildTopic(service.ServiceID, "build_from_image", body, r.Arch)
}
func (o *OperationHandler) buildFromSourceCode(r *model.ComponentBuildReq, service *dbmodel.TenantServices) error {
if r.CodeInfo.RepoURL == "" || r.CodeInfo.Branch == "" || r.DeployVersion == "" {
logrus.Infof("r.CodeInfo.RepoURL:%v, r.CodeInfo.Branch:%v, r.DeployVersion:%v", r.CodeInfo.RepoURL, r.CodeInfo.Branch, r.DeployVersion)
return fmt.Errorf("build from code failure, args error")
}
body := make(map[string]interface{})
body["tenant_id"] = service.TenantID
body["service_id"] = service.ServiceID
body["repo_url"] = r.CodeInfo.RepoURL
body["action"] = r.Action
body["lang"] = r.CodeInfo.Lang
body["runtime"] = r.CodeInfo.Runtime
body["deploy_version"] = r.DeployVersion
body["event_id"] = r.GetEventID()
body["envs"] = r.BuildENVs
body["tenant_name"] = r.TenantName
body["branch"] = r.CodeInfo.Branch
body["server_type"] = r.CodeInfo.ServerType
body["service_alias"] = service.ServiceAlias
if r.CodeInfo.User != "" && r.CodeInfo.Password != "" {
body["user"] = r.CodeInfo.User
body["password"] = r.CodeInfo.Password
}
if r.CodeInfo.DockerfilePath != "" {
body["dockerfile_path"] = r.CodeInfo.DockerfilePath
}
buildStrategy := strings.TrimSpace(r.CodeInfo.BuildStrategy)
if buildStrategy == "" {
buildStrategy = strings.TrimSpace(r.CodeInfo.BuildType)
}
if r.CodeInfo.BuildType != "" {
body["build_type"] = r.CodeInfo.BuildType
}
if buildStrategy != "" {
body["build_strategy"] = buildStrategy
}
if r.CodeInfo.CNBVersionPolicy != nil {
body["cnb_version_policy"] = r.CodeInfo.CNBVersionPolicy
}
body["expire"] = 180
body["configs"] = r.Configs
return o.sendBuildTopic(service.ServiceID, "build_from_source_code", body, r.Arch)
}
func (o *OperationHandler) isWindowsService(serviceID string) bool {
label, err := db.GetManager().TenantServiceLabelDao().GetLabelByNodeSelectorKey(serviceID, "windows")
if label == nil || err != nil {
return false
}
return true
}
func (o *OperationHandler) buildFromVM(r *model.ComponentBuildReq, service *dbmodel.TenantServices) error {
if logrus.IsLevelEnabled(logrus.DebugLevel) {
util.Elapsed(fmt.Sprintf("[buildFromImage] build component(%s)", r.GetComponentID()))()
}
if r.ImageInfo.ImageURL == "" || r.DeployVersion == "" {
return fmt.Errorf("build from image failure, args error")
}
body := make(map[string]interface{})
body["arch"] = r.Arch
body["vm_image_source"] = r.ImageInfo.VMImageSource
body["service_id"] = r.ServiceID
body["deploy_version"] = r.DeployVersion
body["tenant_id"] = service.TenantID
body["configs"] = r.Configs
body["action"] = r.Action
body["event_id"] = r.EventID
body["image"] = r.ImageInfo.ImageURL
return o.sendBuildTopic(service.ServiceID, "build_from_vm", body, r.Arch)
}
func (o *OperationHandler) buildFromKubeBlocks(r *model.ComponentBuildReq, service *dbmodel.TenantServices) error {
if logrus.IsLevelEnabled(logrus.DebugLevel) {
util.Elapsed(fmt.Sprintf("[buildFromKubeBlocks] build component(%s)", r.GetComponentID()))()
}
if r.DeployVersion == "" || service.ServiceID == "" {
return fmt.Errorf("build from kubeblocks failure, args error")
}
body := make(map[string]interface{})
body["service_id"] = service.ServiceID
body["deploy_version"] = r.DeployVersion
body["namespace"] = service.Namespace
body["event_id"] = r.GetEventID()
body["tenant_name"] = r.TenantName
body["tenant_id"] = service.TenantID
body["service_alias"] = service.ServiceAlias
body["action"] = r.Action
body["configs"] = r.Configs
return o.sendBuildTopic(service.ServiceID, "build_from_kubeblocks", body, r.Arch)
}